rxrpc: Transmit ACKs at the point of generation
[linux-block.git] / net / rxrpc / sendmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
3  *
4  * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
15
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19
20 /*
21  * Return true if there's sufficient Tx queue space.
22  */
23 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
24 {
25         if (_tx_win)
26                 *_tx_win = call->tx_bottom;
27         return call->tx_prepared - call->tx_bottom < 256;
28 }
29
30 /*
31  * Wait for space to appear in the Tx queue or a signal to occur.
32  */
33 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
34                                          struct rxrpc_call *call,
35                                          long *timeo)
36 {
37         for (;;) {
38                 set_current_state(TASK_INTERRUPTIBLE);
39                 if (rxrpc_check_tx_space(call, NULL))
40                         return 0;
41
42                 if (call->state >= RXRPC_CALL_COMPLETE)
43                         return call->error;
44
45                 if (signal_pending(current))
46                         return sock_intr_errno(*timeo);
47
48                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
49                 *timeo = schedule_timeout(*timeo);
50         }
51 }
52
53 /*
54  * Wait for space to appear in the Tx queue uninterruptibly, but with
55  * a timeout of 2*RTT if no progress was made and a signal occurred.
56  */
57 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
58                                             struct rxrpc_call *call)
59 {
60         rxrpc_seq_t tx_start, tx_win;
61         signed long rtt, timeout;
62
63         rtt = READ_ONCE(call->peer->srtt_us) >> 3;
64         rtt = usecs_to_jiffies(rtt) * 2;
65         if (rtt < 2)
66                 rtt = 2;
67
68         timeout = rtt;
69         tx_start = smp_load_acquire(&call->acks_hard_ack);
70
71         for (;;) {
72                 set_current_state(TASK_UNINTERRUPTIBLE);
73
74                 if (rxrpc_check_tx_space(call, &tx_win))
75                         return 0;
76
77                 if (call->state >= RXRPC_CALL_COMPLETE)
78                         return call->error;
79
80                 if (timeout == 0 &&
81                     tx_win == tx_start && signal_pending(current))
82                         return -EINTR;
83
84                 if (tx_win != tx_start) {
85                         timeout = rtt;
86                         tx_start = tx_win;
87                 }
88
89                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
90                 timeout = schedule_timeout(timeout);
91         }
92 }
93
94 /*
95  * Wait for space to appear in the Tx queue uninterruptibly.
96  */
97 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
98                                             struct rxrpc_call *call,
99                                             long *timeo)
100 {
101         for (;;) {
102                 set_current_state(TASK_UNINTERRUPTIBLE);
103                 if (rxrpc_check_tx_space(call, NULL))
104                         return 0;
105
106                 if (call->state >= RXRPC_CALL_COMPLETE)
107                         return call->error;
108
109                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
110                 *timeo = schedule_timeout(*timeo);
111         }
112 }
113
114 /*
115  * wait for space to appear in the transmit/ACK window
116  * - caller holds the socket locked
117  */
118 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
119                                     struct rxrpc_call *call,
120                                     long *timeo,
121                                     bool waitall)
122 {
123         DECLARE_WAITQUEUE(myself, current);
124         int ret;
125
126         _enter(",{%u,%u,%u,%u}",
127                call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize);
128
129         add_wait_queue(&call->waitq, &myself);
130
131         switch (call->interruptibility) {
132         case RXRPC_INTERRUPTIBLE:
133                 if (waitall)
134                         ret = rxrpc_wait_for_tx_window_waitall(rx, call);
135                 else
136                         ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
137                 break;
138         case RXRPC_PREINTERRUPTIBLE:
139         case RXRPC_UNINTERRUPTIBLE:
140         default:
141                 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
142                 break;
143         }
144
145         remove_wait_queue(&call->waitq, &myself);
146         set_current_state(TASK_RUNNING);
147         _leave(" = %d", ret);
148         return ret;
149 }
150
151 /*
152  * Notify the owner of the call that the transmit phase is ended and the last
153  * packet has been queued.
154  */
155 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
156                                 rxrpc_notify_end_tx_t notify_end_tx)
157 {
158         if (notify_end_tx)
159                 notify_end_tx(&rx->sk, call, call->user_call_ID);
160 }
161
162 /*
163  * Queue a DATA packet for transmission, set the resend timeout and send
164  * the packet immediately.  Returns the error from rxrpc_send_data_packet()
165  * in case the caller wants to do something with it.
166  */
167 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
168                                struct rxrpc_txbuf *txb,
169                                rxrpc_notify_end_tx_t notify_end_tx)
170 {
171         unsigned long now;
172         rxrpc_seq_t seq = txb->seq;
173         bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
174
175         rxrpc_inc_stat(call->rxnet, stat_tx_data);
176
177         ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
178
179         /* We have to set the timestamp before queueing as the retransmit
180          * algorithm can see the packet as soon as we queue it.
181          */
182         txb->last_sent = ktime_get_real();
183
184         if (last)
185                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
186         else
187                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
188
189         /* Add the packet to the call's output buffer */
190         spin_lock(&call->tx_lock);
191         poke = list_empty(&call->tx_sendmsg);
192         list_add_tail(&txb->call_link, &call->tx_sendmsg);
193         call->tx_prepared = seq;
194         spin_unlock(&call->tx_lock);
195
196         if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
197                 _debug("________awaiting reply/ACK__________");
198                 write_lock(&call->state_lock);
199                 switch (call->state) {
200                 case RXRPC_CALL_CLIENT_SEND_REQUEST:
201                         call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
202                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
203                         break;
204                 case RXRPC_CALL_SERVER_ACK_REQUEST:
205                         call->state = RXRPC_CALL_SERVER_SEND_REPLY;
206                         now = jiffies;
207                         WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
208                         if (call->ackr_reason == RXRPC_ACK_DELAY)
209                                 call->ackr_reason = 0;
210                         trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
211                         if (!last)
212                                 break;
213                         fallthrough;
214                 case RXRPC_CALL_SERVER_SEND_REPLY:
215                         call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
216                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
217                         break;
218                 default:
219                         break;
220                 }
221                 write_unlock(&call->state_lock);
222         }
223
224         if (poke)
225                 rxrpc_poke_call(call, rxrpc_call_poke_start);
226 }
227
228 /*
229  * send data through a socket
230  * - must be called in process context
231  * - The caller holds the call user access mutex, but not the socket lock.
232  */
233 static int rxrpc_send_data(struct rxrpc_sock *rx,
234                            struct rxrpc_call *call,
235                            struct msghdr *msg, size_t len,
236                            rxrpc_notify_end_tx_t notify_end_tx,
237                            bool *_dropped_lock)
238 {
239         struct rxrpc_txbuf *txb;
240         struct sock *sk = &rx->sk;
241         enum rxrpc_call_state state;
242         long timeo;
243         bool more = msg->msg_flags & MSG_MORE;
244         int ret, copied = 0;
245
246         timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
247
248         /* this should be in poll */
249         sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
250
251 reload:
252         ret = -EPIPE;
253         if (sk->sk_shutdown & SEND_SHUTDOWN)
254                 goto maybe_error;
255         state = READ_ONCE(call->state);
256         ret = -ESHUTDOWN;
257         if (state >= RXRPC_CALL_COMPLETE)
258                 goto maybe_error;
259         ret = -EPROTO;
260         if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
261             state != RXRPC_CALL_SERVER_ACK_REQUEST &&
262             state != RXRPC_CALL_SERVER_SEND_REPLY)
263                 goto maybe_error;
264
265         ret = -EMSGSIZE;
266         if (call->tx_total_len != -1) {
267                 if (len - copied > call->tx_total_len)
268                         goto maybe_error;
269                 if (!more && len - copied != call->tx_total_len)
270                         goto maybe_error;
271         }
272
273         txb = call->tx_pending;
274         call->tx_pending = NULL;
275         if (txb)
276                 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
277
278         do {
279                 if (!txb) {
280                         size_t remain, bufsize, chunk, offset;
281
282                         _debug("alloc");
283
284                         if (!rxrpc_check_tx_space(call, NULL))
285                                 goto wait_for_space;
286
287                         /* Work out the maximum size of a packet.  Assume that
288                          * the security header is going to be in the padded
289                          * region (enc blocksize), but the trailer is not.
290                          */
291                         remain = more ? INT_MAX : msg_data_left(msg);
292                         ret = call->conn->security->how_much_data(call, remain,
293                                                                   &bufsize, &chunk, &offset);
294                         if (ret < 0)
295                                 goto maybe_error;
296
297                         _debug("SIZE: %zu/%zu @%zu", chunk, bufsize, offset);
298
299                         /* create a buffer that we can retain until it's ACK'd */
300                         ret = -ENOMEM;
301                         txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_DATA,
302                                                 GFP_KERNEL);
303                         if (!txb)
304                                 goto maybe_error;
305
306                         txb->offset = offset;
307                         txb->space -= offset;
308                         txb->space = min_t(size_t, chunk, txb->space);
309                 }
310
311                 _debug("append");
312
313                 /* append next segment of data to the current buffer */
314                 if (msg_data_left(msg) > 0) {
315                         size_t copy = min_t(size_t, txb->space, msg_data_left(msg));
316
317                         _debug("add %zu", copy);
318                         if (!copy_from_iter_full(txb->data + txb->offset, copy,
319                                                  &msg->msg_iter))
320                                 goto efault;
321                         _debug("added");
322                         txb->space -= copy;
323                         txb->len += copy;
324                         txb->offset += copy;
325                         copied += copy;
326                         if (call->tx_total_len != -1)
327                                 call->tx_total_len -= copy;
328                 }
329
330                 /* check for the far side aborting the call or a network error
331                  * occurring */
332                 if (call->state == RXRPC_CALL_COMPLETE)
333                         goto call_terminated;
334
335                 /* add the packet to the send queue if it's now full */
336                 if (!txb->space ||
337                     (msg_data_left(msg) == 0 && !more)) {
338                         if (msg_data_left(msg) == 0 && !more) {
339                                 txb->wire.flags |= RXRPC_LAST_PACKET;
340                                 __set_bit(RXRPC_TXBUF_LAST, &txb->flags);
341                         }
342                         else if (call->tx_top - call->acks_hard_ack <
343                                  call->tx_winsize)
344                                 txb->wire.flags |= RXRPC_MORE_PACKETS;
345
346                         ret = call->security->secure_packet(call, txb);
347                         if (ret < 0)
348                                 goto out;
349
350                         rxrpc_queue_packet(rx, call, txb, notify_end_tx);
351                         txb = NULL;
352                 }
353         } while (msg_data_left(msg) > 0);
354
355 success:
356         ret = copied;
357         if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
358                 read_lock(&call->state_lock);
359                 if (call->error < 0)
360                         ret = call->error;
361                 read_unlock(&call->state_lock);
362         }
363 out:
364         call->tx_pending = txb;
365         _leave(" = %d", ret);
366         return ret;
367
368 call_terminated:
369         rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
370         _leave(" = %d", call->error);
371         return call->error;
372
373 maybe_error:
374         if (copied)
375                 goto success;
376         goto out;
377
378 efault:
379         ret = -EFAULT;
380         goto out;
381
382 wait_for_space:
383         ret = -EAGAIN;
384         if (msg->msg_flags & MSG_DONTWAIT)
385                 goto maybe_error;
386         mutex_unlock(&call->user_mutex);
387         *_dropped_lock = true;
388         ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
389                                        msg->msg_flags & MSG_WAITALL);
390         if (ret < 0)
391                 goto maybe_error;
392         if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
393                 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
394                         ret = sock_intr_errno(timeo);
395                         goto maybe_error;
396                 }
397         } else {
398                 mutex_lock(&call->user_mutex);
399         }
400         *_dropped_lock = false;
401         goto reload;
402 }
403
404 /*
405  * extract control messages from the sendmsg() control buffer
406  */
407 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
408 {
409         struct cmsghdr *cmsg;
410         bool got_user_ID = false;
411         int len;
412
413         if (msg->msg_controllen == 0)
414                 return -EINVAL;
415
416         for_each_cmsghdr(cmsg, msg) {
417                 if (!CMSG_OK(msg, cmsg))
418                         return -EINVAL;
419
420                 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
421                 _debug("CMSG %d, %d, %d",
422                        cmsg->cmsg_level, cmsg->cmsg_type, len);
423
424                 if (cmsg->cmsg_level != SOL_RXRPC)
425                         continue;
426
427                 switch (cmsg->cmsg_type) {
428                 case RXRPC_USER_CALL_ID:
429                         if (msg->msg_flags & MSG_CMSG_COMPAT) {
430                                 if (len != sizeof(u32))
431                                         return -EINVAL;
432                                 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
433                         } else {
434                                 if (len != sizeof(unsigned long))
435                                         return -EINVAL;
436                                 p->call.user_call_ID = *(unsigned long *)
437                                         CMSG_DATA(cmsg);
438                         }
439                         got_user_ID = true;
440                         break;
441
442                 case RXRPC_ABORT:
443                         if (p->command != RXRPC_CMD_SEND_DATA)
444                                 return -EINVAL;
445                         p->command = RXRPC_CMD_SEND_ABORT;
446                         if (len != sizeof(p->abort_code))
447                                 return -EINVAL;
448                         p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
449                         if (p->abort_code == 0)
450                                 return -EINVAL;
451                         break;
452
453                 case RXRPC_CHARGE_ACCEPT:
454                         if (p->command != RXRPC_CMD_SEND_DATA)
455                                 return -EINVAL;
456                         p->command = RXRPC_CMD_CHARGE_ACCEPT;
457                         if (len != 0)
458                                 return -EINVAL;
459                         break;
460
461                 case RXRPC_EXCLUSIVE_CALL:
462                         p->exclusive = true;
463                         if (len != 0)
464                                 return -EINVAL;
465                         break;
466
467                 case RXRPC_UPGRADE_SERVICE:
468                         p->upgrade = true;
469                         if (len != 0)
470                                 return -EINVAL;
471                         break;
472
473                 case RXRPC_TX_LENGTH:
474                         if (p->call.tx_total_len != -1 || len != sizeof(__s64))
475                                 return -EINVAL;
476                         p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
477                         if (p->call.tx_total_len < 0)
478                                 return -EINVAL;
479                         break;
480
481                 case RXRPC_SET_CALL_TIMEOUT:
482                         if (len & 3 || len < 4 || len > 12)
483                                 return -EINVAL;
484                         memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
485                         p->call.nr_timeouts = len / 4;
486                         if (p->call.timeouts.hard > INT_MAX / HZ)
487                                 return -ERANGE;
488                         if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
489                                 return -ERANGE;
490                         if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
491                                 return -ERANGE;
492                         break;
493
494                 default:
495                         return -EINVAL;
496                 }
497         }
498
499         if (!got_user_ID)
500                 return -EINVAL;
501         if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
502                 return -EINVAL;
503         _leave(" = 0");
504         return 0;
505 }
506
507 /*
508  * Create a new client call for sendmsg().
509  * - Called with the socket lock held, which it must release.
510  * - If it returns a call, the call's lock will need releasing by the caller.
511  */
512 static struct rxrpc_call *
513 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
514                                   struct rxrpc_send_params *p)
515         __releases(&rx->sk.sk_lock.slock)
516         __acquires(&call->user_mutex)
517 {
518         struct rxrpc_conn_parameters cp;
519         struct rxrpc_call *call;
520         struct key *key;
521
522         DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
523
524         _enter("");
525
526         if (!msg->msg_name) {
527                 release_sock(&rx->sk);
528                 return ERR_PTR(-EDESTADDRREQ);
529         }
530
531         key = rx->key;
532         if (key && !rx->key->payload.data[0])
533                 key = NULL;
534
535         memset(&cp, 0, sizeof(cp));
536         cp.local                = rx->local;
537         cp.key                  = rx->key;
538         cp.security_level       = rx->min_sec_level;
539         cp.exclusive            = rx->exclusive | p->exclusive;
540         cp.upgrade              = p->upgrade;
541         cp.service_id           = srx->srx_service;
542         call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL,
543                                      atomic_inc_return(&rxrpc_debug_id));
544         /* The socket is now unlocked */
545
546         rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp);
547         _leave(" = %p\n", call);
548         return call;
549 }
550
551 /*
552  * send a message forming part of a client call through an RxRPC socket
553  * - caller holds the socket locked
554  * - the socket may be either a client socket or a server socket
555  */
556 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
557         __releases(&rx->sk.sk_lock.slock)
558 {
559         enum rxrpc_call_state state;
560         struct rxrpc_call *call;
561         unsigned long now, j;
562         bool dropped_lock = false;
563         int ret;
564
565         struct rxrpc_send_params p = {
566                 .call.tx_total_len      = -1,
567                 .call.user_call_ID      = 0,
568                 .call.nr_timeouts       = 0,
569                 .call.interruptibility  = RXRPC_INTERRUPTIBLE,
570                 .abort_code             = 0,
571                 .command                = RXRPC_CMD_SEND_DATA,
572                 .exclusive              = false,
573                 .upgrade                = false,
574         };
575
576         _enter("");
577
578         ret = rxrpc_sendmsg_cmsg(msg, &p);
579         if (ret < 0)
580                 goto error_release_sock;
581
582         if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
583                 ret = -EINVAL;
584                 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
585                         goto error_release_sock;
586                 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
587                 goto error_release_sock;
588         }
589
590         call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
591         if (!call) {
592                 ret = -EBADSLT;
593                 if (p.command != RXRPC_CMD_SEND_DATA)
594                         goto error_release_sock;
595                 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
596                 /* The socket is now unlocked... */
597                 if (IS_ERR(call))
598                         return PTR_ERR(call);
599                 /* ... and we have the call lock. */
600                 ret = 0;
601                 if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
602                         goto out_put_unlock;
603         } else {
604                 switch (READ_ONCE(call->state)) {
605                 case RXRPC_CALL_UNINITIALISED:
606                 case RXRPC_CALL_CLIENT_AWAIT_CONN:
607                 case RXRPC_CALL_SERVER_PREALLOC:
608                 case RXRPC_CALL_SERVER_SECURING:
609                         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
610                         ret = -EBUSY;
611                         goto error_release_sock;
612                 default:
613                         break;
614                 }
615
616                 ret = mutex_lock_interruptible(&call->user_mutex);
617                 release_sock(&rx->sk);
618                 if (ret < 0) {
619                         ret = -ERESTARTSYS;
620                         goto error_put;
621                 }
622
623                 if (p.call.tx_total_len != -1) {
624                         ret = -EINVAL;
625                         if (call->tx_total_len != -1 ||
626                             call->tx_pending ||
627                             call->tx_top != 0)
628                                 goto error_put;
629                         call->tx_total_len = p.call.tx_total_len;
630                 }
631         }
632
633         switch (p.call.nr_timeouts) {
634         case 3:
635                 j = msecs_to_jiffies(p.call.timeouts.normal);
636                 if (p.call.timeouts.normal > 0 && j == 0)
637                         j = 1;
638                 WRITE_ONCE(call->next_rx_timo, j);
639                 fallthrough;
640         case 2:
641                 j = msecs_to_jiffies(p.call.timeouts.idle);
642                 if (p.call.timeouts.idle > 0 && j == 0)
643                         j = 1;
644                 WRITE_ONCE(call->next_req_timo, j);
645                 fallthrough;
646         case 1:
647                 if (p.call.timeouts.hard > 0) {
648                         j = msecs_to_jiffies(p.call.timeouts.hard);
649                         now = jiffies;
650                         j += now;
651                         WRITE_ONCE(call->expect_term_by, j);
652                         rxrpc_reduce_call_timer(call, j, now,
653                                                 rxrpc_timer_set_for_hard);
654                 }
655                 break;
656         }
657
658         state = READ_ONCE(call->state);
659         _debug("CALL %d USR %lx ST %d on CONN %p",
660                call->debug_id, call->user_call_ID, state, call->conn);
661
662         if (state >= RXRPC_CALL_COMPLETE) {
663                 /* it's too late for this call */
664                 ret = -ESHUTDOWN;
665         } else if (p.command == RXRPC_CMD_SEND_ABORT) {
666                 ret = 0;
667                 if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
668                         ret = rxrpc_send_abort_packet(call);
669         } else if (p.command != RXRPC_CMD_SEND_DATA) {
670                 ret = -EINVAL;
671         } else {
672                 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
673         }
674
675 out_put_unlock:
676         if (!dropped_lock)
677                 mutex_unlock(&call->user_mutex);
678 error_put:
679         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
680         _leave(" = %d", ret);
681         return ret;
682
683 error_release_sock:
684         release_sock(&rx->sk);
685         return ret;
686 }
687
688 /**
689  * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
690  * @sock: The socket the call is on
691  * @call: The call to send data through
692  * @msg: The data to send
693  * @len: The amount of data to send
694  * @notify_end_tx: Notification that the last packet is queued.
695  *
696  * Allow a kernel service to send data on a call.  The call must be in an state
697  * appropriate to sending data.  No control data should be supplied in @msg,
698  * nor should an address be supplied.  MSG_MORE should be flagged if there's
699  * more data to come, otherwise this data will end the transmission phase.
700  */
701 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
702                            struct msghdr *msg, size_t len,
703                            rxrpc_notify_end_tx_t notify_end_tx)
704 {
705         bool dropped_lock = false;
706         int ret;
707
708         _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
709
710         ASSERTCMP(msg->msg_name, ==, NULL);
711         ASSERTCMP(msg->msg_control, ==, NULL);
712
713         mutex_lock(&call->user_mutex);
714
715         _debug("CALL %d USR %lx ST %d on CONN %p",
716                call->debug_id, call->user_call_ID, call->state, call->conn);
717
718         switch (READ_ONCE(call->state)) {
719         case RXRPC_CALL_CLIENT_SEND_REQUEST:
720         case RXRPC_CALL_SERVER_ACK_REQUEST:
721         case RXRPC_CALL_SERVER_SEND_REPLY:
722                 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
723                                       notify_end_tx, &dropped_lock);
724                 break;
725         case RXRPC_CALL_COMPLETE:
726                 read_lock(&call->state_lock);
727                 ret = call->error;
728                 read_unlock(&call->state_lock);
729                 break;
730         default:
731                 /* Request phase complete for this client call */
732                 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("late_send"));
733                 ret = -EPROTO;
734                 break;
735         }
736
737         if (!dropped_lock)
738                 mutex_unlock(&call->user_mutex);
739         _leave(" = %d", ret);
740         return ret;
741 }
742 EXPORT_SYMBOL(rxrpc_kernel_send_data);
743
744 /**
745  * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
746  * @sock: The socket the call is on
747  * @call: The call to be aborted
748  * @abort_code: The abort code to stick into the ABORT packet
749  * @error: Local error value
750  * @why: 3-char string indicating why.
751  *
752  * Allow a kernel service to abort a call, if it's still in an abortable state
753  * and return true if the call was aborted, false if it was already complete.
754  */
755 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
756                              u32 abort_code, int error, const char *why)
757 {
758         bool aborted;
759
760         _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
761
762         mutex_lock(&call->user_mutex);
763
764         aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
765         if (aborted)
766                 rxrpc_send_abort_packet(call);
767
768         mutex_unlock(&call->user_mutex);
769         return aborted;
770 }
771 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
772
773 /**
774  * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
775  * @sock: The socket the call is on
776  * @call: The call to be informed
777  * @tx_total_len: The amount of data to be transmitted for this call
778  *
779  * Allow a kernel service to set the total transmit length on a call.  This
780  * allows buffer-to-packet encrypt-and-copy to be performed.
781  *
782  * This function is primarily for use for setting the reply length since the
783  * request length can be set when beginning the call.
784  */
785 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
786                                 s64 tx_total_len)
787 {
788         WARN_ON(call->tx_total_len != -1);
789         call->tx_total_len = tx_total_len;
790 }
791 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);