net/smc: rename connection index to RMBE index
[linux-2.6-block.git] / net / smc / smc_tx.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
e6727f39
UB
2/*
3 * Shared Memory Communications over RDMA (SMC-R) and RoCE
4 *
5 * Manage send buffer.
6 * Producer:
7 * Copy user space data into send buffer, if send buffer space available.
8 * Consumer:
9 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10 *
11 * Copyright IBM Corp. 2016
12 *
13 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
14 */
15
16#include <linux/net.h>
17#include <linux/rcupdate.h>
18#include <linux/workqueue.h>
c3edc401
IM
19#include <linux/sched/signal.h>
20
e6727f39 21#include <net/sock.h>
01d2f7e2 22#include <net/tcp.h>
e6727f39
UB
23
24#include "smc.h"
25#include "smc_wr.h"
26#include "smc_cdc.h"
27#include "smc_tx.h"
28
18e537cd 29#define SMC_TX_WORK_DELAY HZ
01d2f7e2 30#define SMC_TX_CORK_DELAY (HZ >> 2) /* 250 ms */
18e537cd 31
e6727f39
UB
32/***************************** sndbuf producer *******************************/
33
34/* callback implementation for sk.sk_write_space()
35 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
36 * called under sk_socket lock.
37 */
38static void smc_tx_write_space(struct sock *sk)
39{
40 struct socket *sock = sk->sk_socket;
41 struct smc_sock *smc = smc_sk(sk);
42 struct socket_wq *wq;
43
44 /* similar to sk_stream_write_space */
45 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
46 clear_bit(SOCK_NOSPACE, &sock->flags);
47 rcu_read_lock();
48 wq = rcu_dereference(sk->sk_wq);
49 if (skwq_has_sleeper(wq))
50 wake_up_interruptible_poll(&wq->wait,
a9a08845
LT
51 EPOLLOUT | EPOLLWRNORM |
52 EPOLLWRBAND);
e6727f39
UB
53 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
54 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
55 rcu_read_unlock();
56 }
57}
58
59/* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
60 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
61 */
62void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
63{
64 if (smc->sk.sk_socket &&
65 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
66 smc->sk.sk_write_space(&smc->sk);
67}
68
69/* blocks sndbuf producer until at least one byte of free space available */
70static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
71{
72 DEFINE_WAIT_FUNC(wait, woken_wake_function);
73 struct smc_connection *conn = &smc->conn;
74 struct sock *sk = &smc->sk;
75 bool noblock;
76 long timeo;
77 int rc = 0;
78
79 /* similar to sk_stream_wait_memory */
80 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
81 noblock = timeo ? false : true;
82 add_wait_queue(sk_sleep(sk), &wait);
83 while (1) {
84 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
85 if (sk->sk_err ||
86 (sk->sk_shutdown & SEND_SHUTDOWN) ||
87 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
88 rc = -EPIPE;
89 break;
90 }
aa377e68 91 if (smc_cdc_rxed_any_close(conn)) {
e6727f39
UB
92 rc = -ECONNRESET;
93 break;
94 }
95 if (!timeo) {
96 if (noblock)
97 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
98 rc = -EAGAIN;
99 break;
100 }
101 if (signal_pending(current)) {
102 rc = sock_intr_errno(timeo);
103 break;
104 }
105 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
106 if (atomic_read(&conn->sndbuf_space))
107 break; /* at least 1 byte of free space available */
108 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
e6727f39
UB
109 sk_wait_event(sk, &timeo,
110 sk->sk_err ||
111 (sk->sk_shutdown & SEND_SHUTDOWN) ||
aa377e68 112 smc_cdc_rxed_any_close(conn) ||
e6727f39
UB
113 atomic_read(&conn->sndbuf_space),
114 &wait);
e6727f39
UB
115 }
116 remove_wait_queue(sk_sleep(sk), &wait);
117 return rc;
118}
119
01d2f7e2
UB
120static bool smc_tx_is_corked(struct smc_sock *smc)
121{
122 struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
123
124 return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
125}
126
e6727f39
UB
127/* sndbuf producer: main API called by socket layer.
128 * called under sock lock.
129 */
130int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
131{
132 size_t copylen, send_done = 0, send_remaining = len;
133 size_t chunk_len, chunk_off, chunk_len_sum;
134 struct smc_connection *conn = &smc->conn;
135 union smc_host_cursor prep;
136 struct sock *sk = &smc->sk;
137 char *sndbuf_base;
138 int tx_cnt_prep;
139 int writespace;
140 int rc, chunk;
141
142 /* This should be in poll */
143 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
144
145 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
146 rc = -EPIPE;
147 goto out_err;
148 }
149
150 while (msg_data_left(msg)) {
151 if (sk->sk_state == SMC_INIT)
152 return -ENOTCONN;
153 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
b38d7324 154 (smc->sk.sk_err == ECONNABORTED) ||
e6727f39
UB
155 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
156 return -EPIPE;
157 if (smc_cdc_rxed_any_close(conn))
158 return send_done ?: -ECONNRESET;
159
160 if (!atomic_read(&conn->sndbuf_space)) {
161 rc = smc_tx_wait_memory(smc, msg->msg_flags);
162 if (rc) {
163 if (send_done)
164 return send_done;
165 goto out_err;
166 }
167 continue;
168 }
169
170 /* initialize variables for 1st iteration of subsequent loop */
171 /* could be just 1 byte, even after smc_tx_wait_memory above */
172 writespace = atomic_read(&conn->sndbuf_space);
173 /* not more than what user space asked for */
174 copylen = min_t(size_t, send_remaining, writespace);
175 /* determine start of sndbuf */
176 sndbuf_base = conn->sndbuf_desc->cpu_addr;
177 smc_curs_write(&prep,
178 smc_curs_read(&conn->tx_curs_prep, conn),
179 conn);
180 tx_cnt_prep = prep.count;
181 /* determine chunks where to write into sndbuf */
182 /* either unwrapped case, or 1st chunk of wrapped case */
69cb7dc0
HW
183 chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
184 tx_cnt_prep);
e6727f39
UB
185 chunk_len_sum = chunk_len;
186 chunk_off = tx_cnt_prep;
10428dd8 187 smc_sndbuf_sync_sg_for_cpu(conn);
e6727f39
UB
188 for (chunk = 0; chunk < 2; chunk++) {
189 rc = memcpy_from_msg(sndbuf_base + chunk_off,
190 msg, chunk_len);
191 if (rc) {
10428dd8 192 smc_sndbuf_sync_sg_for_device(conn);
e6727f39
UB
193 if (send_done)
194 return send_done;
195 goto out_err;
196 }
197 send_done += chunk_len;
198 send_remaining -= chunk_len;
199
200 if (chunk_len_sum == copylen)
201 break; /* either on 1st or 2nd iteration */
202 /* prepare next (== 2nd) iteration */
203 chunk_len = copylen - chunk_len; /* remainder */
204 chunk_len_sum += chunk_len;
205 chunk_off = 0; /* modulo offset in send ring buffer */
206 }
10428dd8 207 smc_sndbuf_sync_sg_for_device(conn);
e6727f39 208 /* update cursors */
69cb7dc0 209 smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
e6727f39
UB
210 smc_curs_write(&conn->tx_curs_prep,
211 smc_curs_read(&prep, conn),
212 conn);
213 /* increased in send tasklet smc_cdc_tx_handler() */
214 smp_mb__before_atomic();
215 atomic_sub(copylen, &conn->sndbuf_space);
69cb7dc0 216 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
e6727f39
UB
217 smp_mb__after_atomic();
218 /* since we just produced more new data into sndbuf,
219 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
220 */
01d2f7e2
UB
221 if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
222 (atomic_read(&conn->sndbuf_space) >
69cb7dc0 223 (conn->sndbuf_desc->len >> 1)))
01d2f7e2
UB
224 /* for a corked socket defer the RDMA writes if there
225 * is still sufficient sndbuf_space available
226 */
227 schedule_delayed_work(&conn->tx_work,
228 SMC_TX_CORK_DELAY);
229 else
230 smc_tx_sndbuf_nonempty(conn);
e6727f39
UB
231 } /* while (msg_data_left(msg)) */
232
233 return send_done;
234
235out_err:
236 rc = sk_stream_error(sk, msg->msg_flags, rc);
237 /* make sure we wake any epoll edge trigger waiter */
238 if (unlikely(rc == -EAGAIN))
239 sk->sk_write_space(sk);
240 return rc;
241}
242
243/***************************** sndbuf consumer *******************************/
244
245/* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
246static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
247 int num_sges, struct ib_sge sges[])
248{
249 struct smc_link_group *lgr = conn->lgr;
250 struct ib_send_wr *failed_wr = NULL;
251 struct ib_rdma_wr rdma_wr;
252 struct smc_link *link;
253 int rc;
254
255 memset(&rdma_wr, 0, sizeof(rdma_wr));
256 link = &lgr->lnk[SMC_SINGLE_LINK];
257 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
258 rdma_wr.wr.sg_list = sges;
259 rdma_wr.wr.num_sge = num_sges;
260 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
261 rdma_wr.remote_addr =
262 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
263 /* RMBE within RMB */
92a138e3 264 ((conn->peer_rmbe_idx - 1) * conn->peer_rmbe_size) +
e6727f39
UB
265 /* offset within RMBE */
266 peer_rmbe_offset;
267 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
268 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
b4772b3a 269 if (rc) {
e6727f39 270 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
b4772b3a
UB
271 smc_lgr_terminate(lgr);
272 }
e6727f39
UB
273 return rc;
274}
275
276/* sndbuf consumer */
277static inline void smc_tx_advance_cursors(struct smc_connection *conn,
278 union smc_host_cursor *prod,
279 union smc_host_cursor *sent,
280 size_t len)
281{
282 smc_curs_add(conn->peer_rmbe_size, prod, len);
283 /* increased in recv tasklet smc_cdc_msg_rcv() */
284 smp_mb__before_atomic();
285 /* data in flight reduces usable snd_wnd */
286 atomic_sub(len, &conn->peer_rmbe_space);
287 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
288 smp_mb__after_atomic();
69cb7dc0 289 smc_curs_add(conn->sndbuf_desc->len, sent, len);
e6727f39
UB
290}
291
292/* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
293 * usable snd_wnd as max transmit
294 */
295static int smc_tx_rdma_writes(struct smc_connection *conn)
296{
297 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
298 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
299 union smc_host_cursor sent, prep, prod, cons;
300 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
301 struct smc_link_group *lgr = conn->lgr;
302 int to_send, rmbespace;
303 struct smc_link *link;
9d8fb617 304 dma_addr_t dma_addr;
e6727f39
UB
305 int num_sges;
306 int rc;
307
308 /* source: sndbuf */
309 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
310 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
311 /* cf. wmem_alloc - (snd_max - snd_una) */
69cb7dc0 312 to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
e6727f39
UB
313 if (to_send <= 0)
314 return 0;
315
316 /* destination: RMBE */
317 /* cf. snd_wnd */
318 rmbespace = atomic_read(&conn->peer_rmbe_space);
319 if (rmbespace <= 0)
320 return 0;
321 smc_curs_write(&prod,
322 smc_curs_read(&conn->local_tx_ctrl.prod, conn),
323 conn);
324 smc_curs_write(&cons,
325 smc_curs_read(&conn->local_rx_ctrl.cons, conn),
326 conn);
327
328 /* if usable snd_wnd closes ask peer to advertise once it opens again */
329 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
330 /* cf. usable snd_wnd */
331 len = min(to_send, rmbespace);
332
333 /* initialize variables for first iteration of subsequent nested loop */
334 link = &lgr->lnk[SMC_SINGLE_LINK];
335 dst_off = prod.count;
336 if (prod.wrap == cons.wrap) {
337 /* the filled destination area is unwrapped,
338 * hence the available free destination space is wrapped
339 * and we need 2 destination chunks of sum len; start with 1st
340 * which is limited by what's available in sndbuf
341 */
342 dst_len = min_t(size_t,
343 conn->peer_rmbe_size - prod.count, len);
344 } else {
345 /* the filled destination area is wrapped,
346 * hence the available free destination space is unwrapped
347 * and we need a single destination chunk of entire len
348 */
349 dst_len = len;
350 }
351 dst_len_sum = dst_len;
352 src_off = sent.count;
353 /* dst_len determines the maximum src_len */
69cb7dc0 354 if (sent.count + dst_len <= conn->sndbuf_desc->len) {
e6727f39
UB
355 /* unwrapped src case: single chunk of entire dst_len */
356 src_len = dst_len;
357 } else {
358 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
69cb7dc0 359 src_len = conn->sndbuf_desc->len - sent.count;
e6727f39
UB
360 }
361 src_len_sum = src_len;
9d8fb617 362 dma_addr = sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
e6727f39
UB
363 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
364 num_sges = 0;
365 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
9d8fb617 366 sges[srcchunk].addr = dma_addr + src_off;
e6727f39
UB
367 sges[srcchunk].length = src_len;
368 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
369 num_sges++;
370 src_off += src_len;
69cb7dc0
HW
371 if (src_off >= conn->sndbuf_desc->len)
372 src_off -= conn->sndbuf_desc->len;
e6727f39
UB
373 /* modulo in send ring */
374 if (src_len_sum == dst_len)
375 break; /* either on 1st or 2nd iteration */
376 /* prepare next (== 2nd) iteration */
377 src_len = dst_len - src_len; /* remainder */
378 src_len_sum += src_len;
379 }
380 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
381 if (rc)
382 return rc;
383 if (dst_len_sum == len)
384 break; /* either on 1st or 2nd iteration */
385 /* prepare next (== 2nd) iteration */
386 dst_off = 0; /* modulo offset in RMBE ring buffer */
387 dst_len = len - dst_len; /* remainder */
388 dst_len_sum += dst_len;
389 src_len = min_t(int,
69cb7dc0 390 dst_len, conn->sndbuf_desc->len - sent.count);
e6727f39
UB
391 src_len_sum = src_len;
392 }
393
394 smc_tx_advance_cursors(conn, &prod, &sent, len);
395 /* update connection's cursors with advanced local cursors */
396 smc_curs_write(&conn->local_tx_ctrl.prod,
397 smc_curs_read(&prod, conn),
398 conn);
399 /* dst: peer RMBE */
400 smc_curs_write(&conn->tx_curs_sent,
401 smc_curs_read(&sent, conn),
402 conn);
403 /* src: local sndbuf */
404
405 return 0;
406}
407
408/* Wakeup sndbuf consumers from any context (IRQ or process)
409 * since there is more data to transmit; usable snd_wnd as max transmit
410 */
411int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
412{
413 struct smc_cdc_tx_pend *pend;
414 struct smc_wr_buf *wr_buf;
415 int rc;
416
417 spin_lock_bh(&conn->send_lock);
51957bc5 418 rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend);
e6727f39
UB
419 if (rc < 0) {
420 if (rc == -EBUSY) {
b38d7324
UB
421 struct smc_sock *smc =
422 container_of(conn, struct smc_sock, conn);
423
424 if (smc->sk.sk_err == ECONNABORTED) {
425 rc = sock_error(&smc->sk);
426 goto out_unlock;
427 }
e6727f39 428 rc = 0;
1a0a04c7 429 if (conn->alert_token_local) /* connection healthy */
01d2f7e2
UB
430 mod_delayed_work(system_wq, &conn->tx_work,
431 SMC_TX_WORK_DELAY);
e6727f39
UB
432 }
433 goto out_unlock;
434 }
435
436 rc = smc_tx_rdma_writes(conn);
437 if (rc) {
438 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
439 (struct smc_wr_tx_pend_priv *)pend);
440 goto out_unlock;
441 }
442
443 rc = smc_cdc_msg_send(conn, wr_buf, pend);
444
445out_unlock:
446 spin_unlock_bh(&conn->send_lock);
447 return rc;
448}
449
450/* Wakeup sndbuf consumers from process context
451 * since there is more data to transmit
452 */
be7f3e59 453void smc_tx_work(struct work_struct *work)
e6727f39 454{
18e537cd 455 struct smc_connection *conn = container_of(to_delayed_work(work),
e6727f39
UB
456 struct smc_connection,
457 tx_work);
458 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
90cacb2e 459 int rc;
e6727f39
UB
460
461 lock_sock(&smc->sk);
1a0a04c7
UB
462 if (smc->sk.sk_err ||
463 !conn->alert_token_local ||
464 conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
465 goto out;
466
90cacb2e
UB
467 rc = smc_tx_sndbuf_nonempty(conn);
468 if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
469 !atomic_read(&conn->bytes_to_rcv))
470 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
1a0a04c7
UB
471
472out:
e6727f39
UB
473 release_sock(&smc->sk);
474}
475
952310cc
UB
476void smc_tx_consumer_update(struct smc_connection *conn)
477{
478 union smc_host_cursor cfed, cons;
6b5771aa 479 int to_confirm;
952310cc
UB
480
481 smc_curs_write(&cons,
482 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
483 conn);
484 smc_curs_write(&cfed,
485 smc_curs_read(&conn->rx_curs_confirmed, conn),
486 conn);
69cb7dc0 487 to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
952310cc
UB
488
489 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
490 ((to_confirm > conn->rmbe_update_limit) &&
69cb7dc0 491 ((to_confirm > (conn->rmb_desc->len / 2)) ||
952310cc 492 conn->local_rx_ctrl.prod_flags.write_blocked))) {
1a0a04c7
UB
493 if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
494 conn->alert_token_local) { /* connection healthy */
18e537cd
UB
495 schedule_delayed_work(&conn->tx_work,
496 SMC_TX_WORK_DELAY);
952310cc
UB
497 return;
498 }
499 smc_curs_write(&conn->rx_curs_confirmed,
500 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
501 conn);
502 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
503 }
504 if (conn->local_rx_ctrl.prod_flags.write_blocked &&
505 !atomic_read(&conn->bytes_to_rcv))
506 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
507}
508
e6727f39
UB
509/***************************** send initialize *******************************/
510
511/* Initialize send properties on connection establishment. NB: not __init! */
512void smc_tx_init(struct smc_sock *smc)
513{
514 smc->sk.sk_write_space = smc_tx_write_space;
e6727f39 515}