xfs: convert CIL busy extents to per-cpu
[linux-block.git] / fs / xfs / xfs_log_cil.c
CommitLineData
0b61f8a4 1// SPDX-License-Identifier: GPL-2.0
71e330b5
DC
2/*
3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
71e330b5
DC
4 */
5
6#include "xfs.h"
7#include "xfs_fs.h"
4fb6e8ad 8#include "xfs_format.h"
239880ef 9#include "xfs_log_format.h"
70a9883c 10#include "xfs_shared.h"
239880ef 11#include "xfs_trans_resv.h"
71e330b5 12#include "xfs_mount.h"
efc27b52 13#include "xfs_extent_busy.h"
239880ef
DC
14#include "xfs_trans.h"
15#include "xfs_trans_priv.h"
16#include "xfs_log.h"
17#include "xfs_log_priv.h"
4560e78f
CH
18#include "xfs_trace.h"
19
20struct workqueue_struct *xfs_discard_wq;
71e330b5 21
71e330b5
DC
22/*
23 * Allocate a new ticket. Failing to get a new ticket makes it really hard to
24 * recover, so we don't allow failure here. Also, we allocate in a context that
25 * we don't want to be issuing transactions from, so we need to tell the
26 * allocation code this as well.
27 *
28 * We don't reserve any space for the ticket - we are going to steal whatever
29 * space we require from transactions as they commit. To ensure we reserve all
30 * the space required, we need to set the current reservation of the ticket to
31 * zero so that we know to steal the initial transaction overhead from the
32 * first transaction commit.
33 */
34static struct xlog_ticket *
35xlog_cil_ticket_alloc(
f7bdf03a 36 struct xlog *log)
71e330b5
DC
37{
38 struct xlog_ticket *tic;
39
c7610dce 40 tic = xlog_ticket_alloc(log, 0, 1, 0);
71e330b5
DC
41
42 /*
43 * set the current reservation to zero so we know to steal the basic
44 * transaction overhead reservation from the first transaction commit.
45 */
46 tic->t_curr_res = 0;
31151cc3 47 tic->t_iclog_hdrs = 0;
71e330b5
DC
48 return tic;
49}
50
31151cc3
DC
51static inline void
52xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil)
53{
54 struct xlog *log = cil->xc_log;
55
56 atomic_set(&cil->xc_iclog_hdrs,
57 (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) /
58 (log->l_iclog_size - log->l_iclog_hsize)));
59}
60
22b1afc5
DC
61/*
62 * Check if the current log item was first committed in this sequence.
63 * We can't rely on just the log item being in the CIL, we have to check
64 * the recorded commit sequence number.
65 *
66 * Note: for this to be used in a non-racy manner, it has to be called with
67 * CIL flushing locked out. As a result, it should only be used during the
68 * transaction commit process when deciding what to format into the item.
69 */
70static bool
71xlog_item_in_current_chkpt(
72 struct xfs_cil *cil,
73 struct xfs_log_item *lip)
74{
88591e7f 75 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
22b1afc5
DC
76 return false;
77
78 /*
79 * li_seq is written on the first commit of a log item to record the
80 * first checkpoint it is written to. Hence if it is different to the
81 * current sequence, we're in a new checkpoint.
82 */
83 return lip->li_seq == READ_ONCE(cil->xc_current_sequence);
84}
85
86bool
87xfs_log_item_in_current_chkpt(
88 struct xfs_log_item *lip)
89{
90 return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip);
91}
92
39823d0f
DC
93/*
94 * Unavoidable forward declaration - xlog_cil_push_work() calls
95 * xlog_cil_ctx_alloc() itself.
96 */
97static void xlog_cil_push_work(struct work_struct *work);
98
99static struct xfs_cil_ctx *
100xlog_cil_ctx_alloc(void)
101{
102 struct xfs_cil_ctx *ctx;
103
104 ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
105 INIT_LIST_HEAD(&ctx->committing);
106 INIT_LIST_HEAD(&ctx->busy_extents);
107 INIT_WORK(&ctx->push_work, xlog_cil_push_work);
108 return ctx;
109}
110
7c8ade21
DC
111/*
112 * Aggregate the CIL per cpu structures into global counts, lists, etc and
113 * clear the percpu state ready for the next context to use. This is called
114 * from the push code with the context lock held exclusively, hence nothing else
115 * will be accessing or modifying the per-cpu counters.
116 */
117static void
118xlog_cil_push_pcp_aggregate(
119 struct xfs_cil *cil,
120 struct xfs_cil_ctx *ctx)
121{
122 struct xlog_cil_pcp *cilpcp;
123 int cpu;
124
125 for_each_online_cpu(cpu) {
126 cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
127
1dd2a2c1
DC
128 ctx->ticket->t_curr_res += cilpcp->space_reserved;
129 cilpcp->space_reserved = 0;
130
df7a4a21
DC
131 if (!list_empty(&cilpcp->busy_extents)) {
132 list_splice_init(&cilpcp->busy_extents,
133 &ctx->busy_extents);
134 }
135
7c8ade21
DC
136 /*
137 * We're in the middle of switching cil contexts. Reset the
138 * counter we use to detect when the current context is nearing
139 * full.
140 */
141 cilpcp->space_used = 0;
142 }
143}
144
145/*
146 * Aggregate the CIL per-cpu space used counters into the global atomic value.
147 * This is called when the per-cpu counter aggregation will first pass the soft
148 * limit threshold so we can switch to atomic counter aggregation for accurate
149 * detection of hard limit traversal.
150 */
151static void
152xlog_cil_insert_pcp_aggregate(
153 struct xfs_cil *cil,
154 struct xfs_cil_ctx *ctx)
155{
156 struct xlog_cil_pcp *cilpcp;
157 int cpu;
158 int count = 0;
159
160 /* Trigger atomic updates then aggregate only for the first caller */
161 if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags))
162 return;
163
164 for_each_online_cpu(cpu) {
165 int old, prev;
166
167 cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
168 do {
169 old = cilpcp->space_used;
170 prev = cmpxchg(&cilpcp->space_used, old, 0);
171 } while (old != prev);
172 count += old;
173 }
174 atomic_add(count, &ctx->space_used);
175}
176
39823d0f
DC
177static void
178xlog_cil_ctx_switch(
179 struct xfs_cil *cil,
180 struct xfs_cil_ctx *ctx)
181{
31151cc3 182 xlog_cil_set_iclog_hdr_count(cil);
88591e7f 183 set_bit(XLOG_CIL_EMPTY, &cil->xc_flags);
7c8ade21 184 set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags);
39823d0f
DC
185 ctx->sequence = ++cil->xc_current_sequence;
186 ctx->cil = cil;
187 cil->xc_ctx = ctx;
188}
189
71e330b5
DC
190/*
191 * After the first stage of log recovery is done, we know where the head and
192 * tail of the log are. We need this log initialisation done before we can
193 * initialise the first CIL checkpoint context.
194 *
195 * Here we allocate a log ticket to track space usage during a CIL push. This
196 * ticket is passed to xlog_write() directly so that we don't slowly leak log
197 * space by failing to account for space used by log headers and additional
198 * region headers for split regions.
199 */
200void
201xlog_cil_init_post_recovery(
f7bdf03a 202 struct xlog *log)
71e330b5 203{
71e330b5
DC
204 log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
205 log->l_cilp->xc_ctx->sequence = 1;
31151cc3 206 xlog_cil_set_iclog_hdr_count(log->l_cilp);
71e330b5
DC
207}
208
b1c5ebb2
DC
209static inline int
210xlog_cil_iovec_space(
211 uint niovecs)
212{
213 return round_up((sizeof(struct xfs_log_vec) +
214 niovecs * sizeof(struct xfs_log_iovec)),
215 sizeof(uint64_t));
216}
217
218/*
219 * Allocate or pin log vector buffers for CIL insertion.
220 *
221 * The CIL currently uses disposable buffers for copying a snapshot of the
222 * modified items into the log during a push. The biggest problem with this is
223 * the requirement to allocate the disposable buffer during the commit if:
224 * a) does not exist; or
225 * b) it is too small
226 *
227 * If we do this allocation within xlog_cil_insert_format_items(), it is done
228 * under the xc_ctx_lock, which means that a CIL push cannot occur during
229 * the memory allocation. This means that we have a potential deadlock situation
230 * under low memory conditions when we have lots of dirty metadata pinned in
231 * the CIL and we need a CIL commit to occur to free memory.
232 *
233 * To avoid this, we need to move the memory allocation outside the
234 * xc_ctx_lock, but because the log vector buffers are disposable, that opens
235 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log
236 * vector buffers between the check and the formatting of the item into the
237 * log vector buffer within the xc_ctx_lock.
238 *
239 * Because the log vector buffer needs to be unchanged during the CIL push
240 * process, we cannot share the buffer between the transaction commit (which
241 * modifies the buffer) and the CIL push context that is writing the changes
242 * into the log. This means skipping preallocation of buffer space is
243 * unreliable, but we most definitely do not want to be allocating and freeing
244 * buffers unnecessarily during commits when overwrites can be done safely.
245 *
246 * The simplest solution to this problem is to allocate a shadow buffer when a
247 * log item is committed for the second time, and then to only use this buffer
248 * if necessary. The buffer can remain attached to the log item until such time
249 * it is needed, and this is the buffer that is reallocated to match the size of
250 * the incoming modification. Then during the formatting of the item we can swap
251 * the active buffer with the new one if we can't reuse the existing buffer. We
252 * don't free the old buffer as it may be reused on the next modification if
253 * it's size is right, otherwise we'll free and reallocate it at that point.
254 *
255 * This function builds a vector for the changes in each log item in the
256 * transaction. It then works out the length of the buffer needed for each log
257 * item, allocates them and attaches the vector to the log item in preparation
258 * for the formatting step which occurs under the xc_ctx_lock.
259 *
260 * While this means the memory footprint goes up, it avoids the repeated
261 * alloc/free pattern that repeated modifications of an item would otherwise
262 * cause, and hence minimises the CPU overhead of such behaviour.
263 */
264static void
265xlog_cil_alloc_shadow_bufs(
266 struct xlog *log,
267 struct xfs_trans *tp)
268{
e6631f85 269 struct xfs_log_item *lip;
b1c5ebb2 270
e6631f85 271 list_for_each_entry(lip, &tp->t_items, li_trans) {
b1c5ebb2
DC
272 struct xfs_log_vec *lv;
273 int niovecs = 0;
274 int nbytes = 0;
275 int buf_size;
276 bool ordered = false;
277
278 /* Skip items which aren't dirty in this transaction. */
e6631f85 279 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
b1c5ebb2
DC
280 continue;
281
282 /* get number of vecs and size of data to be stored */
283 lip->li_ops->iop_size(lip, &niovecs, &nbytes);
284
285 /*
286 * Ordered items need to be tracked but we do not wish to write
287 * them. We need a logvec to track the object, but we do not
288 * need an iovec or buffer to be allocated for copying data.
289 */
290 if (niovecs == XFS_LOG_VEC_ORDERED) {
291 ordered = true;
292 niovecs = 0;
293 nbytes = 0;
294 }
295
296 /*
8d547cf9
DC
297 * We 64-bit align the length of each iovec so that the start of
298 * the next one is naturally aligned. We'll need to account for
299 * that slack space here.
300 *
301 * We also add the xlog_op_header to each region when
302 * formatting, but that's not accounted to the size of the item
303 * at this point. Hence we'll need an addition number of bytes
304 * for each vector to hold an opheader.
305 *
306 * Then round nbytes up to 64-bit alignment so that the initial
307 * buffer alignment is easy to calculate and verify.
b1c5ebb2 308 */
8d547cf9
DC
309 nbytes += niovecs *
310 (sizeof(uint64_t) + sizeof(struct xlog_op_header));
b1c5ebb2
DC
311 nbytes = round_up(nbytes, sizeof(uint64_t));
312
313 /*
314 * The data buffer needs to start 64-bit aligned, so round up
315 * that space to ensure we can align it appropriately and not
316 * overrun the buffer.
317 */
318 buf_size = nbytes + xlog_cil_iovec_space(niovecs);
319
320 /*
321 * if we have no shadow buffer, or it is too small, we need to
322 * reallocate it.
323 */
324 if (!lip->li_lv_shadow ||
325 buf_size > lip->li_lv_shadow->lv_size) {
b1c5ebb2
DC
326 /*
327 * We free and allocate here as a realloc would copy
8dc9384b 328 * unnecessary data. We don't use kvzalloc() for the
b1c5ebb2
DC
329 * same reason - we don't need to zero the data area in
330 * the buffer, only the log vector header and the iovec
331 * storage.
332 */
333 kmem_free(lip->li_lv_shadow);
45ff8b47 334 lv = xlog_kvmalloc(buf_size);
b1c5ebb2 335
b1c5ebb2
DC
336 memset(lv, 0, xlog_cil_iovec_space(niovecs));
337
338 lv->lv_item = lip;
339 lv->lv_size = buf_size;
340 if (ordered)
341 lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
342 else
343 lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1];
344 lip->li_lv_shadow = lv;
345 } else {
346 /* same or smaller, optimise common overwrite case */
347 lv = lip->li_lv_shadow;
348 if (ordered)
349 lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
350 else
351 lv->lv_buf_len = 0;
352 lv->lv_bytes = 0;
353 lv->lv_next = NULL;
354 }
355
356 /* Ensure the lv is set up according to ->iop_size */
357 lv->lv_niovecs = niovecs;
358
359 /* The allocated data region lies beyond the iovec region */
360 lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs);
361 }
362
363}
364
991aaf65
DC
365/*
366 * Prepare the log item for insertion into the CIL. Calculate the difference in
593e3439 367 * log space it will consume, and if it is a new item pin it as well.
991aaf65
DC
368 */
369STATIC void
370xfs_cil_prepare_item(
371 struct xlog *log,
372 struct xfs_log_vec *lv,
373 struct xfs_log_vec *old_lv,
593e3439 374 int *diff_len)
991aaf65
DC
375{
376 /* Account for the new LV being passed in */
593e3439 377 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
110dc24a 378 *diff_len += lv->lv_bytes;
991aaf65
DC
379
380 /*
381 * If there is no old LV, this is the first time we've seen the item in
382 * this CIL context and so we need to pin it. If we are replacing the
b1c5ebb2
DC
383 * old_lv, then remove the space it accounts for and make it the shadow
384 * buffer for later freeing. In both cases we are now switching to the
b63da6c8 385 * shadow buffer, so update the pointer to it appropriately.
991aaf65 386 */
b1c5ebb2 387 if (!old_lv) {
e8b78db7
CH
388 if (lv->lv_item->li_ops->iop_pin)
389 lv->lv_item->li_ops->iop_pin(lv->lv_item);
b1c5ebb2
DC
390 lv->lv_item->li_lv_shadow = NULL;
391 } else if (old_lv != lv) {
991aaf65
DC
392 ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED);
393
110dc24a 394 *diff_len -= old_lv->lv_bytes;
b1c5ebb2 395 lv->lv_item->li_lv_shadow = old_lv;
991aaf65
DC
396 }
397
398 /* attach new log vector to log item */
399 lv->lv_item->li_lv = lv;
400
401 /*
402 * If this is the first time the item is being committed to the
403 * CIL, store the sequence number on the log item so we can
404 * tell in future commits whether this is the first checkpoint
405 * the item is being committed into.
406 */
407 if (!lv->lv_item->li_seq)
408 lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence;
409}
410
71e330b5
DC
411/*
412 * Format log item into a flat buffers
413 *
414 * For delayed logging, we need to hold a formatted buffer containing all the
415 * changes on the log item. This enables us to relog the item in memory and
416 * write it out asynchronously without needing to relock the object that was
417 * modified at the time it gets written into the iclog.
418 *
b1c5ebb2
DC
419 * This function takes the prepared log vectors attached to each log item, and
420 * formats the changes into the log vector buffer. The buffer it uses is
421 * dependent on the current state of the vector in the CIL - the shadow lv is
422 * guaranteed to be large enough for the current modification, but we will only
423 * use that if we can't reuse the existing lv. If we can't reuse the existing
424 * lv, then simple swap it out for the shadow lv. We don't free it - that is
425 * done lazily either by th enext modification or the freeing of the log item.
71e330b5
DC
426 *
427 * We don't set up region headers during this process; we simply copy the
428 * regions into the flat buffer. We can do this because we still have to do a
429 * formatting step to write the regions into the iclog buffer. Writing the
430 * ophdrs during the iclog write means that we can support splitting large
431 * regions across iclog boundares without needing a change in the format of the
432 * item/region encapsulation.
433 *
434 * Hence what we need to do now is change the rewrite the vector array to point
435 * to the copied region inside the buffer we just allocated. This allows us to
436 * format the regions into the iclog as though they are being formatted
437 * directly out of the objects themselves.
438 */
991aaf65
DC
439static void
440xlog_cil_insert_format_items(
441 struct xlog *log,
442 struct xfs_trans *tp,
593e3439 443 int *diff_len)
71e330b5 444{
e6631f85 445 struct xfs_log_item *lip;
71e330b5 446
0244b960
CH
447 /* Bail out if we didn't find a log item. */
448 if (list_empty(&tp->t_items)) {
449 ASSERT(0);
991aaf65 450 return;
0244b960
CH
451 }
452
e6631f85 453 list_for_each_entry(lip, &tp->t_items, li_trans) {
7492c5b4 454 struct xfs_log_vec *lv;
b1c5ebb2
DC
455 struct xfs_log_vec *old_lv = NULL;
456 struct xfs_log_vec *shadow;
fd63875c 457 bool ordered = false;
71e330b5 458
0244b960 459 /* Skip items which aren't dirty in this transaction. */
e6631f85 460 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
0244b960
CH
461 continue;
462
fd63875c 463 /*
b1c5ebb2
DC
464 * The formatting size information is already attached to
465 * the shadow lv on the log item.
fd63875c 466 */
b1c5ebb2
DC
467 shadow = lip->li_lv_shadow;
468 if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED)
fd63875c 469 ordered = true;
fd63875c 470
b1c5ebb2
DC
471 /* Skip items that do not have any vectors for writing */
472 if (!shadow->lv_niovecs && !ordered)
473 continue;
0244b960 474
f5baac35 475 /* compare to existing item size */
b1c5ebb2
DC
476 old_lv = lip->li_lv;
477 if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
f5baac35
DC
478 /* same or smaller, optimise common overwrite case */
479 lv = lip->li_lv;
480 lv->lv_next = NULL;
481
482 if (ordered)
483 goto insert;
484
991aaf65
DC
485 /*
486 * set the item up as though it is a new insertion so
487 * that the space reservation accounting is correct.
488 */
110dc24a 489 *diff_len -= lv->lv_bytes;
b1c5ebb2
DC
490
491 /* Ensure the lv is set up according to ->iop_size */
492 lv->lv_niovecs = shadow->lv_niovecs;
493
494 /* reset the lv buffer information for new formatting */
495 lv->lv_buf_len = 0;
496 lv->lv_bytes = 0;
497 lv->lv_buf = (char *)lv +
498 xlog_cil_iovec_space(lv->lv_niovecs);
9597df6b 499 } else {
b1c5ebb2
DC
500 /* switch to shadow buffer! */
501 lv = shadow;
9597df6b 502 lv->lv_item = lip;
9597df6b
CH
503 if (ordered) {
504 /* track as an ordered logvec */
505 ASSERT(lip->li_lv == NULL);
9597df6b
CH
506 goto insert;
507 }
f5baac35
DC
508 }
509
3895e51f 510 ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t)));
bde7cff6 511 lip->li_ops->iop_format(lip, lv);
7492c5b4 512insert:
593e3439 513 xfs_cil_prepare_item(log, lv, old_lv, diff_len);
3b93c7aa 514 }
d1583a38
DC
515}
516
7c8ade21
DC
517/*
518 * The use of lockless waitqueue_active() requires that the caller has
519 * serialised itself against the wakeup call in xlog_cil_push_work(). That
520 * can be done by either holding the push lock or the context lock.
521 */
522static inline bool
523xlog_cil_over_hard_limit(
524 struct xlog *log,
525 int32_t space_used)
526{
527 if (waitqueue_active(&log->l_cilp->xc_push_wait))
528 return true;
529 if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log))
530 return true;
531 return false;
532}
533
d1583a38
DC
534/*
535 * Insert the log items into the CIL and calculate the difference in space
536 * consumed by the item. Add the space to the checkpoint ticket and calculate
537 * if the change requires additional log metadata. If it does, take that space
42b2aa86 538 * as well. Remove the amount of space we added to the checkpoint ticket from
d1583a38
DC
539 * the current transaction ticket so that the accounting works out correctly.
540 */
3b93c7aa
DC
541static void
542xlog_cil_insert_items(
f7bdf03a 543 struct xlog *log,
0d227466
DC
544 struct xfs_trans *tp,
545 uint32_t released_space)
3b93c7aa 546{
d1583a38
DC
547 struct xfs_cil *cil = log->l_cilp;
548 struct xfs_cil_ctx *ctx = cil->xc_ctx;
e6631f85 549 struct xfs_log_item *lip;
d1583a38 550 int len = 0;
e2f23426 551 int iovhdr_res = 0, split_res = 0, ctx_res = 0;
7c8ade21
DC
552 int space_used;
553 struct xlog_cil_pcp *cilpcp;
3b93c7aa 554
991aaf65 555 ASSERT(tp);
d1583a38
DC
556
557 /*
d1583a38
DC
558 * We can do this safely because the context can't checkpoint until we
559 * are done so it doesn't matter exactly how we update the CIL.
560 */
593e3439 561 xlog_cil_insert_format_items(log, tp, &len);
991aaf65 562
7c8ade21
DC
563 /*
564 * Subtract the space released by intent cancelation from the space we
565 * consumed so that we remove it from the CIL space and add it back to
566 * the current transaction reservation context.
567 */
568 len -= released_space;
569
570 /*
571 * Grab the per-cpu pointer for the CIL before we start any accounting.
572 * That ensures that we are running with pre-emption disabled and so we
573 * can't be scheduled away between split sample/update operations that
574 * are done without outside locking to serialise them.
575 */
576 cilpcp = get_cpu_ptr(cil->xc_pcp);
577
d1583a38 578 /*
88591e7f
DC
579 * We need to take the CIL checkpoint unit reservation on the first
580 * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't
12380d23
DC
581 * unnecessarily do an atomic op in the fast path here. We don't need to
582 * hold the xc_cil_lock here to clear the XLOG_CIL_EMPTY bit as we are
583 * under the xc_ctx_lock here and that needs to be held exclusively to
584 * reset the XLOG_CIL_EMPTY bit.
d1583a38 585 */
88591e7f 586 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) &&
12380d23 587 test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
e2f23426 588 ctx_res = ctx->ticket->t_unit_res;
12380d23 589
31151cc3
DC
590 /*
591 * Check if we need to steal iclog headers. atomic_read() is not a
592 * locked atomic operation, so we can check the value before we do any
593 * real atomic ops in the fast path. If we've already taken the CIL unit
594 * reservation from this commit, we've already got one iclog header
595 * space reserved so we have to account for that otherwise we risk
596 * overrunning the reservation on this ticket.
597 *
598 * If the CIL is already at the hard limit, we might need more header
599 * space that originally reserved. So steal more header space from every
600 * commit that occurs once we are over the hard limit to ensure the CIL
601 * push won't run out of reservation space.
602 *
603 * This can steal more than we need, but that's OK.
7c8ade21
DC
604 *
605 * The cil->xc_ctx_lock provides the serialisation necessary for safely
606 * calling xlog_cil_over_hard_limit() in this context.
31151cc3 607 */
7c8ade21 608 space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len;
31151cc3 609 if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
7c8ade21
DC
610 xlog_cil_over_hard_limit(log, space_used)) {
611 split_res = log->l_iclog_hsize +
31151cc3
DC
612 sizeof(struct xlog_op_header);
613 if (ctx_res)
614 ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1);
615 else
616 ctx_res = split_res * tp->t_ticket->t_iclog_hdrs;
617 atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
d1583a38 618 }
1dd2a2c1 619 cilpcp->space_reserved += ctx_res;
31151cc3 620
d4ca1d55 621 /*
7c8ade21
DC
622 * Accurately account when over the soft limit, otherwise fold the
623 * percpu count into the global count if over the per-cpu threshold.
d4ca1d55 624 */
7c8ade21
DC
625 if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) {
626 atomic_add(len, &ctx->space_used);
627 } else if (cilpcp->space_used + len >
628 (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) {
629 space_used = atomic_add_return(cilpcp->space_used + len,
630 &ctx->space_used);
631 cilpcp->space_used = 0;
632
633 /*
634 * If we just transitioned over the soft limit, we need to
635 * transition to the global atomic counter.
636 */
637 if (space_used >= XLOG_CIL_SPACE_LIMIT(log))
638 xlog_cil_insert_pcp_aggregate(cil, ctx);
639 } else {
640 cilpcp->space_used += len;
d4ca1d55 641 }
df7a4a21
DC
642 /* attach the transaction to the CIL if it has any busy extents */
643 if (!list_empty(&tp->t_busy))
644 list_splice_init(&tp->t_busy, &cilpcp->busy_extents);
7c8ade21
DC
645 put_cpu_ptr(cilpcp);
646
e2f23426
BF
647 /*
648 * Now (re-)position everything modified at the tail of the CIL.
649 * We do this here so we only need to take the CIL lock once during
650 * the transaction commit.
651 */
1dd2a2c1 652 spin_lock(&cil->xc_cil_lock);
e6631f85 653 list_for_each_entry(lip, &tp->t_items, li_trans) {
e2f23426 654 /* Skip items which aren't dirty in this transaction. */
e6631f85 655 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
e2f23426
BF
656 continue;
657
658 /*
659 * Only move the item if it isn't already at the tail. This is
660 * to prevent a transient list_empty() state when reinserting
661 * an item that is already the only item in the CIL.
662 */
663 if (!list_is_last(&lip->li_cil, &cil->xc_cil))
664 list_move_tail(&lip->li_cil, &cil->xc_cil);
665 }
666
d1583a38 667 spin_unlock(&cil->xc_cil_lock);
d4ca1d55 668
7c8ade21
DC
669 /*
670 * If we've overrun the reservation, dump the tx details before we move
671 * the log items. Shutdown is imminent...
672 */
673 tp->t_ticket->t_curr_res -= ctx_res + len;
674 if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
675 xfs_warn(log->l_mp, "Transaction log reservation overrun:");
676 xfs_warn(log->l_mp,
677 " log items: %d bytes (iov hdrs: %d bytes)",
678 len, iovhdr_res);
679 xfs_warn(log->l_mp, " split region headers: %d bytes",
680 split_res);
681 xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
682 xlog_print_trans(tp);
b5f17bec 683 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
7c8ade21 684 }
71e330b5
DC
685}
686
687static void
688xlog_cil_free_logvec(
689 struct xfs_log_vec *log_vector)
690{
691 struct xfs_log_vec *lv;
692
693 for (lv = log_vector; lv; ) {
694 struct xfs_log_vec *next = lv->lv_next;
71e330b5
DC
695 kmem_free(lv);
696 lv = next;
697 }
698}
699
4560e78f
CH
700static void
701xlog_discard_endio_work(
702 struct work_struct *work)
703{
704 struct xfs_cil_ctx *ctx =
705 container_of(work, struct xfs_cil_ctx, discard_endio_work);
706 struct xfs_mount *mp = ctx->cil->xc_log->l_mp;
707
708 xfs_extent_busy_clear(mp, &ctx->busy_extents, false);
709 kmem_free(ctx);
710}
711
712/*
713 * Queue up the actual completion to a thread to avoid IRQ-safe locking for
714 * pagb_lock. Note that we need a unbounded workqueue, otherwise we might
715 * get the execution delayed up to 30 seconds for weird reasons.
716 */
717static void
718xlog_discard_endio(
719 struct bio *bio)
720{
721 struct xfs_cil_ctx *ctx = bio->bi_private;
722
723 INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work);
724 queue_work(xfs_discard_wq, &ctx->discard_endio_work);
ea7bd56f 725 bio_put(bio);
4560e78f
CH
726}
727
728static void
729xlog_discard_busy_extents(
730 struct xfs_mount *mp,
731 struct xfs_cil_ctx *ctx)
732{
733 struct list_head *list = &ctx->busy_extents;
734 struct xfs_extent_busy *busyp;
735 struct bio *bio = NULL;
736 struct blk_plug plug;
737 int error = 0;
738
0560f31a 739 ASSERT(xfs_has_discard(mp));
4560e78f
CH
740
741 blk_start_plug(&plug);
742 list_for_each_entry(busyp, list, list) {
743 trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
744 busyp->length);
745
746 error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
747 XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
748 XFS_FSB_TO_BB(mp, busyp->length),
44abff2c 749 GFP_NOFS, &bio);
4560e78f
CH
750 if (error && error != -EOPNOTSUPP) {
751 xfs_info(mp,
752 "discard failed for extent [0x%llx,%u], error %d",
753 (unsigned long long)busyp->bno,
754 busyp->length,
755 error);
756 break;
757 }
758 }
759
760 if (bio) {
761 bio->bi_private = ctx;
762 bio->bi_end_io = xlog_discard_endio;
763 submit_bio(bio);
764 } else {
765 xlog_discard_endio_work(&ctx->discard_endio_work);
766 }
767 blk_finish_plug(&plug);
768}
769
71e330b5
DC
770/*
771 * Mark all items committed and clear busy extents. We free the log vector
772 * chains in a separate pass so that we unpin the log items as quickly as
773 * possible.
774 */
775static void
776xlog_cil_committed(
12e6a0f4 777 struct xfs_cil_ctx *ctx)
71e330b5 778{
e84661aa 779 struct xfs_mount *mp = ctx->cil->xc_log->l_mp;
2039a272 780 bool abort = xlog_is_shutdown(ctx->cil->xc_log);
71e330b5 781
545aa41f
BF
782 /*
783 * If the I/O failed, we're aborting the commit and already shutdown.
784 * Wake any commit waiters before aborting the log items so we don't
785 * block async log pushers on callbacks. Async log pushers explicitly do
786 * not wait on log force completion because they may be holding locks
787 * required to unpin items.
788 */
789 if (abort) {
790 spin_lock(&ctx->cil->xc_push_lock);
68a74dca 791 wake_up_all(&ctx->cil->xc_start_wait);
545aa41f
BF
792 wake_up_all(&ctx->cil->xc_commit_wait);
793 spin_unlock(&ctx->cil->xc_push_lock);
794 }
795
0e57f6a3
DC
796 xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
797 ctx->start_lsn, abort);
71e330b5 798
4ecbfe63
DC
799 xfs_extent_busy_sort(&ctx->busy_extents);
800 xfs_extent_busy_clear(mp, &ctx->busy_extents,
0560f31a 801 xfs_has_discard(mp) && !abort);
71e330b5 802
4bb928cd 803 spin_lock(&ctx->cil->xc_push_lock);
71e330b5 804 list_del(&ctx->committing);
4bb928cd 805 spin_unlock(&ctx->cil->xc_push_lock);
71e330b5
DC
806
807 xlog_cil_free_logvec(ctx->lv_chain);
e84661aa 808
4560e78f
CH
809 if (!list_empty(&ctx->busy_extents))
810 xlog_discard_busy_extents(mp, ctx);
811 else
812 kmem_free(ctx);
71e330b5
DC
813}
814
89ae379d
CH
815void
816xlog_cil_process_committed(
12e6a0f4 817 struct list_head *list)
89ae379d
CH
818{
819 struct xfs_cil_ctx *ctx;
820
821 while ((ctx = list_first_entry_or_null(list,
822 struct xfs_cil_ctx, iclog_entry))) {
823 list_del(&ctx->iclog_entry);
12e6a0f4 824 xlog_cil_committed(ctx);
89ae379d
CH
825 }
826}
827
c45aba40
DC
828/*
829* Record the LSN of the iclog we were just granted space to start writing into.
830* If the context doesn't have a start_lsn recorded, then this iclog will
831* contain the start record for the checkpoint. Otherwise this write contains
832* the commit record for the checkpoint.
833*/
834void
835xlog_cil_set_ctx_write_state(
836 struct xfs_cil_ctx *ctx,
837 struct xlog_in_core *iclog)
838{
839 struct xfs_cil *cil = ctx->cil;
840 xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn);
841
842 ASSERT(!ctx->commit_lsn);
caa80090
DC
843 if (!ctx->start_lsn) {
844 spin_lock(&cil->xc_push_lock);
68a74dca
DC
845 /*
846 * The LSN we need to pass to the log items on transaction
847 * commit is the LSN reported by the first log vector write, not
848 * the commit lsn. If we use the commit record lsn then we can
919edbad
DC
849 * move the grant write head beyond the tail LSN and overwrite
850 * it.
68a74dca 851 */
c45aba40 852 ctx->start_lsn = lsn;
68a74dca 853 wake_up_all(&cil->xc_start_wait);
caa80090 854 spin_unlock(&cil->xc_push_lock);
919edbad
DC
855
856 /*
857 * Make sure the metadata we are about to overwrite in the log
858 * has been flushed to stable storage before this iclog is
859 * issued.
860 */
861 spin_lock(&cil->xc_log->l_icloglock);
862 iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
863 spin_unlock(&cil->xc_log->l_icloglock);
caa80090
DC
864 return;
865 }
866
867 /*
868 * Take a reference to the iclog for the context so that we still hold
869 * it when xlog_write is done and has released it. This means the
870 * context controls when the iclog is released for IO.
871 */
872 atomic_inc(&iclog->ic_refcnt);
873
874 /*
875 * xlog_state_get_iclog_space() guarantees there is enough space in the
876 * iclog for an entire commit record, so we can attach the context
877 * callbacks now. This needs to be done before we make the commit_lsn
878 * visible to waiters so that checkpoints with commit records in the
879 * same iclog order their IO completion callbacks in the same order that
880 * the commit records appear in the iclog.
881 */
882 spin_lock(&cil->xc_log->l_icloglock);
883 list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks);
884 spin_unlock(&cil->xc_log->l_icloglock);
885
886 /*
887 * Now we can record the commit LSN and wake anyone waiting for this
888 * sequence to have the ordered commit record assigned to a physical
889 * location in the log.
890 */
891 spin_lock(&cil->xc_push_lock);
892 ctx->commit_iclog = iclog;
893 ctx->commit_lsn = lsn;
894 wake_up_all(&cil->xc_commit_wait);
c45aba40
DC
895 spin_unlock(&cil->xc_push_lock);
896}
897
898
2ce82b72 899/*
bf034bc8
DC
900 * Ensure that the order of log writes follows checkpoint sequence order. This
901 * relies on the context LSN being zero until the log write has guaranteed the
902 * LSN that the log write will start at via xlog_state_get_iclog_space().
903 */
68a74dca
DC
904enum _record_type {
905 _START_RECORD,
906 _COMMIT_RECORD,
907};
908
bf034bc8
DC
909static int
910xlog_cil_order_write(
911 struct xfs_cil *cil,
68a74dca
DC
912 xfs_csn_t sequence,
913 enum _record_type record)
bf034bc8
DC
914{
915 struct xfs_cil_ctx *ctx;
916
917restart:
918 spin_lock(&cil->xc_push_lock);
919 list_for_each_entry(ctx, &cil->xc_committing, committing) {
920 /*
921 * Avoid getting stuck in this loop because we were woken by the
922 * shutdown, but then went back to sleep once already in the
923 * shutdown state.
924 */
925 if (xlog_is_shutdown(cil->xc_log)) {
926 spin_unlock(&cil->xc_push_lock);
927 return -EIO;
928 }
929
930 /*
931 * Higher sequences will wait for this one so skip them.
932 * Don't wait for our own sequence, either.
933 */
934 if (ctx->sequence >= sequence)
935 continue;
68a74dca
DC
936
937 /* Wait until the LSN for the record has been recorded. */
938 switch (record) {
939 case _START_RECORD:
940 if (!ctx->start_lsn) {
941 xlog_wait(&cil->xc_start_wait, &cil->xc_push_lock);
942 goto restart;
943 }
944 break;
945 case _COMMIT_RECORD:
946 if (!ctx->commit_lsn) {
947 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
948 goto restart;
949 }
950 break;
bf034bc8
DC
951 }
952 }
953 spin_unlock(&cil->xc_push_lock);
954 return 0;
955}
956
68a74dca
DC
957/*
958 * Write out the log vector change now attached to the CIL context. This will
959 * write a start record that needs to be strictly ordered in ascending CIL
960 * sequence order so that log recovery will always use in-order start LSNs when
961 * replaying checkpoints.
962 */
963static int
964xlog_cil_write_chain(
965 struct xfs_cil_ctx *ctx,
d80fc291
DC
966 struct xfs_log_vec *chain,
967 uint32_t chain_len)
68a74dca
DC
968{
969 struct xlog *log = ctx->cil->xc_log;
970 int error;
971
972 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD);
973 if (error)
974 return error;
14b07ecd 975 return xlog_write(log, ctx, chain, ctx->ticket, chain_len);
68a74dca
DC
976}
977
bf034bc8
DC
978/*
979 * Write out the commit record of a checkpoint transaction to close off a
980 * running log write. These commit records are strictly ordered in ascending CIL
981 * sequence order so that log recovery will always replay the checkpoints in the
982 * correct order.
2ce82b72
DC
983 */
984static int
985xlog_cil_write_commit_record(
caa80090 986 struct xfs_cil_ctx *ctx)
2ce82b72 987{
c45aba40 988 struct xlog *log = ctx->cil->xc_log;
54021b62
DC
989 struct xlog_op_header ophdr = {
990 .oh_clientid = XFS_TRANSACTION,
991 .oh_tid = cpu_to_be32(ctx->ticket->t_tid),
992 .oh_flags = XLOG_COMMIT_TRANS,
993 };
c45aba40 994 struct xfs_log_iovec reg = {
54021b62
DC
995 .i_addr = &ophdr,
996 .i_len = sizeof(struct xlog_op_header),
2ce82b72
DC
997 .i_type = XLOG_REG_TYPE_COMMIT,
998 };
c45aba40 999 struct xfs_log_vec vec = {
2ce82b72
DC
1000 .lv_niovecs = 1,
1001 .lv_iovecp = &reg,
1002 };
c45aba40 1003 int error;
2ce82b72
DC
1004
1005 if (xlog_is_shutdown(log))
1006 return -EIO;
1007
68a74dca
DC
1008 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _COMMIT_RECORD);
1009 if (error)
1010 return error;
1011
54021b62
DC
1012 /* account for space used by record data */
1013 ctx->ticket->t_curr_res -= reg.i_len;
14b07ecd 1014 error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len);
2ce82b72 1015 if (error)
b5f17bec 1016 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
2ce82b72
DC
1017 return error;
1018}
1019
735fbf67 1020struct xlog_cil_trans_hdr {
6eaed95e 1021 struct xlog_op_header oph[2];
735fbf67 1022 struct xfs_trans_header thdr;
6eaed95e 1023 struct xfs_log_iovec lhdr[2];
735fbf67
DC
1024};
1025
1026/*
1027 * Build a checkpoint transaction header to begin the journal transaction. We
1028 * need to account for the space used by the transaction header here as it is
1029 * not accounted for in xlog_write().
6eaed95e
DC
1030 *
1031 * This is the only place we write a transaction header, so we also build the
1032 * log opheaders that indicate the start of a log transaction and wrap the
1033 * transaction header. We keep the start record in it's own log vector rather
1034 * than compacting them into a single region as this ends up making the logic
1035 * in xlog_write() for handling empty opheaders for start, commit and unmount
1036 * records much simpler.
735fbf67
DC
1037 */
1038static void
1039xlog_cil_build_trans_hdr(
1040 struct xfs_cil_ctx *ctx,
1041 struct xlog_cil_trans_hdr *hdr,
1042 struct xfs_log_vec *lvhdr,
1043 int num_iovecs)
1044{
1045 struct xlog_ticket *tic = ctx->ticket;
6eaed95e 1046 __be32 tid = cpu_to_be32(tic->t_tid);
735fbf67
DC
1047
1048 memset(hdr, 0, sizeof(*hdr));
1049
6eaed95e
DC
1050 /* Log start record */
1051 hdr->oph[0].oh_tid = tid;
1052 hdr->oph[0].oh_clientid = XFS_TRANSACTION;
1053 hdr->oph[0].oh_flags = XLOG_START_TRANS;
1054
1055 /* log iovec region pointer */
1056 hdr->lhdr[0].i_addr = &hdr->oph[0];
1057 hdr->lhdr[0].i_len = sizeof(struct xlog_op_header);
1058 hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER;
1059
1060 /* log opheader */
1061 hdr->oph[1].oh_tid = tid;
1062 hdr->oph[1].oh_clientid = XFS_TRANSACTION;
1063 hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header));
1064
1065 /* transaction header in host byte order format */
735fbf67
DC
1066 hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
1067 hdr->thdr.th_type = XFS_TRANS_CHECKPOINT;
1068 hdr->thdr.th_tid = tic->t_tid;
1069 hdr->thdr.th_num_items = num_iovecs;
735fbf67 1070
6eaed95e
DC
1071 /* log iovec region pointer */
1072 hdr->lhdr[1].i_addr = &hdr->oph[1];
1073 hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) +
1074 sizeof(struct xfs_trans_header);
1075 hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR;
1076
6eaed95e
DC
1077 lvhdr->lv_niovecs = 2;
1078 lvhdr->lv_iovecp = &hdr->lhdr[0];
d80fc291 1079 lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
735fbf67 1080 lvhdr->lv_next = ctx->lv_chain;
d80fc291
DC
1081
1082 tic->t_curr_res -= lvhdr->lv_bytes;
735fbf67
DC
1083}
1084
22b1afc5
DC
1085/*
1086 * Pull all the log vectors off the items in the CIL, and remove the items from
1087 * the CIL. We don't need the CIL lock here because it's only needed on the
1088 * transaction commit side which is currently locked out by the flush lock.
0d227466
DC
1089 *
1090 * If a log item is marked with a whiteout, we do not need to write it to the
1091 * journal and so we just move them to the whiteout list for the caller to
1092 * dispose of appropriately.
22b1afc5
DC
1093 */
1094static void
1095xlog_cil_build_lv_chain(
1096 struct xfs_cil *cil,
1097 struct xfs_cil_ctx *ctx,
0d227466 1098 struct list_head *whiteouts,
22b1afc5
DC
1099 uint32_t *num_iovecs,
1100 uint32_t *num_bytes)
1101{
1102 struct xfs_log_vec *lv = NULL;
1103
1104 while (!list_empty(&cil->xc_cil)) {
1105 struct xfs_log_item *item;
1106
1107 item = list_first_entry(&cil->xc_cil,
1108 struct xfs_log_item, li_cil);
0d227466
DC
1109
1110 if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) {
1111 list_move(&item->li_cil, whiteouts);
1112 trace_xfs_cil_whiteout_skip(item);
1113 continue;
1114 }
1115
22b1afc5
DC
1116 list_del_init(&item->li_cil);
1117 if (!ctx->lv_chain)
1118 ctx->lv_chain = item->li_lv;
1119 else
1120 lv->lv_next = item->li_lv;
1121 lv = item->li_lv;
1122 item->li_lv = NULL;
1123 *num_iovecs += lv->lv_niovecs;
1124
1125 /* we don't write ordered log vectors */
1126 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
1127 *num_bytes += lv->lv_bytes;
1128 }
1129}
1130
0d227466
DC
1131static void
1132xlog_cil_cleanup_whiteouts(
1133 struct list_head *whiteouts)
1134{
1135 while (!list_empty(whiteouts)) {
1136 struct xfs_log_item *item = list_first_entry(whiteouts,
1137 struct xfs_log_item, li_cil);
1138 list_del_init(&item->li_cil);
1139 trace_xfs_cil_whiteout_unpin(item);
1140 item->li_ops->iop_unpin(item, 1);
1141 }
1142}
1143
71e330b5 1144/*
c7cc296d
CH
1145 * Push the Committed Item List to the log.
1146 *
1147 * If the current sequence is the same as xc_push_seq we need to do a flush. If
1148 * xc_push_seq is less than the current sequence, then it has already been
a44f13ed
DC
1149 * flushed and we don't need to do anything - the caller will wait for it to
1150 * complete if necessary.
1151 *
c7cc296d
CH
1152 * xc_push_seq is checked unlocked against the sequence number for a match.
1153 * Hence we can allow log forces to run racily and not issue pushes for the
1154 * same sequence twice. If we get a race between multiple pushes for the same
1155 * sequence they will block on the first one and then abort, hence avoiding
1156 * needless pushes.
71e330b5 1157 */
c7cc296d
CH
1158static void
1159xlog_cil_push_work(
1160 struct work_struct *work)
71e330b5 1161{
39823d0f
DC
1162 struct xfs_cil_ctx *ctx =
1163 container_of(work, struct xfs_cil_ctx, push_work);
1164 struct xfs_cil *cil = ctx->cil;
c7cc296d 1165 struct xlog *log = cil->xc_log;
71e330b5 1166 struct xfs_cil_ctx *new_ctx;
d80fc291
DC
1167 int num_iovecs = 0;
1168 int num_bytes = 0;
71e330b5 1169 int error = 0;
735fbf67 1170 struct xlog_cil_trans_hdr thdr;
71e330b5 1171 struct xfs_log_vec lvhdr = { NULL };
0dc8f7f1 1172 xfs_csn_t push_seq;
0020a190 1173 bool push_commit_stable;
0d227466 1174 LIST_HEAD (whiteouts);
71e330b5 1175
39823d0f 1176 new_ctx = xlog_cil_ctx_alloc();
71e330b5
DC
1177 new_ctx->ticket = xlog_cil_ticket_alloc(log);
1178
4c2d542f 1179 down_write(&cil->xc_ctx_lock);
71e330b5 1180
4bb928cd 1181 spin_lock(&cil->xc_push_lock);
4c2d542f
DC
1182 push_seq = cil->xc_push_seq;
1183 ASSERT(push_seq <= ctx->sequence);
0020a190
DC
1184 push_commit_stable = cil->xc_push_commit_stable;
1185 cil->xc_push_commit_stable = false;
71e330b5 1186
0e7ab7ef 1187 /*
19f4e7cc
DC
1188 * As we are about to switch to a new, empty CIL context, we no longer
1189 * need to throttle tasks on CIL space overruns. Wake any waiters that
1190 * the hard push throttle may have caught so they can start committing
1191 * to the new context. The ctx->xc_push_lock provides the serialisation
1192 * necessary for safely using the lockless waitqueue_active() check in
1193 * this context.
0e7ab7ef 1194 */
19f4e7cc 1195 if (waitqueue_active(&cil->xc_push_wait))
c7f87f39 1196 wake_up_all(&cil->xc_push_wait);
0e7ab7ef 1197
7c8ade21
DC
1198 xlog_cil_push_pcp_aggregate(cil, ctx);
1199
4c2d542f
DC
1200 /*
1201 * Check if we've anything to push. If there is nothing, then we don't
1202 * move on to a new sequence number and so we have to be able to push
1203 * this sequence again later.
1204 */
88591e7f 1205 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
4c2d542f 1206 cil->xc_push_seq = 0;
4bb928cd 1207 spin_unlock(&cil->xc_push_lock);
a44f13ed 1208 goto out_skip;
4c2d542f 1209 }
4c2d542f 1210
a44f13ed 1211
cf085a1b 1212 /* check for a previously pushed sequence */
39823d0f 1213 if (push_seq < ctx->sequence) {
8af3dcd3 1214 spin_unlock(&cil->xc_push_lock);
df806158 1215 goto out_skip;
8af3dcd3
DC
1216 }
1217
1218 /*
1219 * We are now going to push this context, so add it to the committing
1220 * list before we do anything else. This ensures that anyone waiting on
1221 * this push can easily detect the difference between a "push in
1222 * progress" and "CIL is empty, nothing to do".
1223 *
1224 * IOWs, a wait loop can now check for:
1225 * the current sequence not being found on the committing list;
1226 * an empty CIL; and
1227 * an unchanged sequence number
1228 * to detect a push that had nothing to do and therefore does not need
1229 * waiting on. If the CIL is not empty, we get put on the committing
1230 * list before emptying the CIL and bumping the sequence number. Hence
1231 * an empty CIL and an unchanged sequence number means we jumped out
1232 * above after doing nothing.
1233 *
1234 * Hence the waiter will either find the commit sequence on the
1235 * committing list or the sequence number will be unchanged and the CIL
1236 * still dirty. In that latter case, the push has not yet started, and
1237 * so the waiter will have to continue trying to check the CIL
1238 * committing list until it is found. In extreme cases of delay, the
1239 * sequence may fully commit between the attempts the wait makes to wait
1240 * on the commit sequence.
1241 */
1242 list_add(&ctx->committing, &cil->xc_committing);
1243 spin_unlock(&cil->xc_push_lock);
df806158 1244
0d227466 1245 xlog_cil_build_lv_chain(cil, ctx, &whiteouts, &num_iovecs, &num_bytes);
71e330b5
DC
1246
1247 /*
39823d0f 1248 * Switch the contexts so we can drop the context lock and move out
71e330b5
DC
1249 * of a shared context. We can't just go straight to the commit record,
1250 * though - we need to synchronise with previous and future commits so
1251 * that the commit records are correctly ordered in the log to ensure
1252 * that we process items during log IO completion in the correct order.
1253 *
1254 * For example, if we get an EFI in one checkpoint and the EFD in the
1255 * next (e.g. due to log forces), we do not want the checkpoint with
1256 * the EFD to be committed before the checkpoint with the EFI. Hence
1257 * we must strictly order the commit records of the checkpoints so
1258 * that: a) the checkpoint callbacks are attached to the iclogs in the
1259 * correct order; and b) the checkpoints are replayed in correct order
1260 * in log recovery.
1261 *
1262 * Hence we need to add this context to the committing context list so
1263 * that higher sequences will wait for us to write out a commit record
1264 * before they do.
f876e446 1265 *
5f9b4b0d 1266 * xfs_log_force_seq requires us to mirror the new sequence into the cil
f876e446
DC
1267 * structure atomically with the addition of this sequence to the
1268 * committing list. This also ensures that we can do unlocked checks
1269 * against the current sequence in log forces without risking
1270 * deferencing a freed context pointer.
71e330b5 1271 */
4bb928cd 1272 spin_lock(&cil->xc_push_lock);
39823d0f 1273 xlog_cil_ctx_switch(cil, new_ctx);
4bb928cd 1274 spin_unlock(&cil->xc_push_lock);
71e330b5
DC
1275 up_write(&cil->xc_ctx_lock);
1276
1277 /*
1278 * Build a checkpoint transaction header and write it to the log to
1279 * begin the transaction. We need to account for the space used by the
1280 * transaction header here as it is not accounted for in xlog_write().
71e330b5 1281 */
735fbf67 1282 xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
d80fc291 1283 num_bytes += lvhdr.lv_bytes;
71e330b5 1284
d80fc291 1285 error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes);
bf034bc8
DC
1286 if (error)
1287 goto out_abort_free_ticket;
71e330b5 1288
caa80090 1289 error = xlog_cil_write_commit_record(ctx);
dd401770
DC
1290 if (error)
1291 goto out_abort_free_ticket;
1292
735fbf67 1293 xfs_log_ticket_ungrant(log, ctx->ticket);
71e330b5 1294
a79b28c2 1295 /*
1effb72a
DC
1296 * If the checkpoint spans multiple iclogs, wait for all previous iclogs
1297 * to complete before we submit the commit_iclog. We can't use state
1298 * checks for this - ACTIVE can be either a past completed iclog or a
1299 * future iclog being filled, while WANT_SYNC through SYNC_DONE can be a
1300 * past or future iclog awaiting IO or ordered IO completion to be run.
1301 * In the latter case, if it's a future iclog and we wait on it, the we
1302 * will hang because it won't get processed through to ic_force_wait
1303 * wakeup until this commit_iclog is written to disk. Hence we use the
1304 * iclog header lsn and compare it to the commit lsn to determine if we
1305 * need to wait on iclogs or not.
a79b28c2 1306 */
caa80090 1307 spin_lock(&log->l_icloglock);
c45aba40 1308 if (ctx->start_lsn != ctx->commit_lsn) {
1effb72a
DC
1309 xfs_lsn_t plsn;
1310
caa80090 1311 plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn);
c45aba40 1312 if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) {
1effb72a
DC
1313 /*
1314 * Waiting on ic_force_wait orders the completion of
1315 * iclogs older than ic_prev. Hence we only need to wait
1316 * on the most recent older iclog here.
1317 */
caa80090 1318 xlog_wait_on_iclog(ctx->commit_iclog->ic_prev);
1effb72a
DC
1319 spin_lock(&log->l_icloglock);
1320 }
1321
1322 /*
1323 * We need to issue a pre-flush so that the ordering for this
1324 * checkpoint is correctly preserved down to stable storage.
1325 */
caa80090 1326 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
a79b28c2
DC
1327 }
1328
eef983ff
DC
1329 /*
1330 * The commit iclog must be written to stable storage to guarantee
1331 * journal IO vs metadata writeback IO is correctly ordered on stable
1332 * storage.
0020a190
DC
1333 *
1334 * If the push caller needs the commit to be immediately stable and the
1335 * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it
1336 * will be written when released, switch it's state to WANT_SYNC right
1337 * now.
eef983ff 1338 */
caa80090 1339 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA;
0020a190
DC
1340 if (push_commit_stable &&
1341 ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
1342 xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
919edbad 1343 xlog_state_release_iclog(log, ctx->commit_iclog);
502a01fa
DC
1344
1345 /* Not safe to reference ctx now! */
1346
eef983ff 1347 spin_unlock(&log->l_icloglock);
0d227466 1348 xlog_cil_cleanup_whiteouts(&whiteouts);
c7cc296d 1349 return;
71e330b5
DC
1350
1351out_skip:
1352 up_write(&cil->xc_ctx_lock);
1353 xfs_log_ticket_put(new_ctx->ticket);
1354 kmem_free(new_ctx);
c7cc296d 1355 return;
71e330b5 1356
7db37c5e 1357out_abort_free_ticket:
735fbf67 1358 xfs_log_ticket_ungrant(log, ctx->ticket);
2039a272 1359 ASSERT(xlog_is_shutdown(log));
0d227466 1360 xlog_cil_cleanup_whiteouts(&whiteouts);
caa80090
DC
1361 if (!ctx->commit_iclog) {
1362 xlog_cil_committed(ctx);
1363 return;
1364 }
1365 spin_lock(&log->l_icloglock);
919edbad 1366 xlog_state_release_iclog(log, ctx->commit_iclog);
caa80090
DC
1367 /* Not safe to reference ctx now! */
1368 spin_unlock(&log->l_icloglock);
4c2d542f
DC
1369}
1370
1371/*
1372 * We need to push CIL every so often so we don't cache more than we can fit in
1373 * the log. The limit really is that a checkpoint can't be more than half the
1374 * log (the current checkpoint is not allowed to overwrite the previous
1375 * checkpoint), but commit latency and memory usage limit this to a smaller
1376 * size.
1377 */
1378static void
1379xlog_cil_push_background(
0e7ab7ef 1380 struct xlog *log) __releases(cil->xc_ctx_lock)
4c2d542f
DC
1381{
1382 struct xfs_cil *cil = log->l_cilp;
7c8ade21 1383 int space_used = atomic_read(&cil->xc_ctx->space_used);
4c2d542f
DC
1384
1385 /*
1386 * The cil won't be empty because we are called while holding the
88591e7f 1387 * context lock so whatever we added to the CIL will still be there.
4c2d542f
DC
1388 */
1389 ASSERT(!list_empty(&cil->xc_cil));
88591e7f 1390 ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
4c2d542f
DC
1391
1392 /*
19f4e7cc 1393 * Don't do a background push if we haven't used up all the
4c2d542f
DC
1394 * space available yet.
1395 */
7c8ade21 1396 if (space_used < XLOG_CIL_SPACE_LIMIT(log)) {
0e7ab7ef 1397 up_read(&cil->xc_ctx_lock);
4c2d542f 1398 return;
0e7ab7ef 1399 }
4c2d542f 1400
4bb928cd 1401 spin_lock(&cil->xc_push_lock);
4c2d542f
DC
1402 if (cil->xc_push_seq < cil->xc_current_sequence) {
1403 cil->xc_push_seq = cil->xc_current_sequence;
33c0dd78 1404 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work);
4c2d542f 1405 }
0e7ab7ef
DC
1406
1407 /*
1408 * Drop the context lock now, we can't hold that if we need to sleep
1409 * because we are over the blocking threshold. The push_lock is still
1410 * held, so blocking threshold sleep/wakeup is still correctly
1411 * serialised here.
1412 */
1413 up_read(&cil->xc_ctx_lock);
1414
1415 /*
1416 * If we are well over the space limit, throttle the work that is being
19f4e7cc
DC
1417 * done until the push work on this context has begun. Enforce the hard
1418 * throttle on all transaction commits once it has been activated, even
1419 * if the committing transactions have resulted in the space usage
1420 * dipping back down under the hard limit.
1421 *
1422 * The ctx->xc_push_lock provides the serialisation necessary for safely
7c8ade21 1423 * calling xlog_cil_over_hard_limit() in this context.
0e7ab7ef 1424 */
7c8ade21 1425 if (xlog_cil_over_hard_limit(log, space_used)) {
0e7ab7ef 1426 trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
7c8ade21 1427 ASSERT(space_used < log->l_logsize);
c7f87f39 1428 xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
0e7ab7ef
DC
1429 return;
1430 }
1431
4bb928cd 1432 spin_unlock(&cil->xc_push_lock);
4c2d542f
DC
1433
1434}
1435
f876e446
DC
1436/*
1437 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence
1438 * number that is passed. When it returns, the work will be queued for
0020a190
DC
1439 * @push_seq, but it won't be completed.
1440 *
1441 * If the caller is performing a synchronous force, we will flush the workqueue
1442 * to get previously queued work moving to minimise the wait time they will
1443 * undergo waiting for all outstanding pushes to complete. The caller is
1444 * expected to do the required waiting for push_seq to complete.
1445 *
1446 * If the caller is performing an async push, we need to ensure that the
1447 * checkpoint is fully flushed out of the iclogs when we finish the push. If we
1448 * don't do this, then the commit record may remain sitting in memory in an
1449 * ACTIVE iclog. This then requires another full log force to push to disk,
1450 * which defeats the purpose of having an async, non-blocking CIL force
1451 * mechanism. Hence in this case we need to pass a flag to the push work to
1452 * indicate it needs to flush the commit record itself.
f876e446 1453 */
4c2d542f 1454static void
f876e446 1455xlog_cil_push_now(
f7bdf03a 1456 struct xlog *log,
0020a190
DC
1457 xfs_lsn_t push_seq,
1458 bool async)
4c2d542f
DC
1459{
1460 struct xfs_cil *cil = log->l_cilp;
1461
1462 if (!cil)
1463 return;
1464
1465 ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
1466
1467 /* start on any pending background push to minimise wait time on it */
0020a190 1468 if (!async)
33c0dd78 1469 flush_workqueue(cil->xc_push_wq);
4c2d542f 1470
70447e0a
DC
1471 spin_lock(&cil->xc_push_lock);
1472
1473 /*
1474 * If this is an async flush request, we always need to set the
1475 * xc_push_commit_stable flag even if something else has already queued
1476 * a push. The flush caller is asking for the CIL to be on stable
1477 * storage when the next push completes, so regardless of who has queued
1478 * the push, the flush requires stable semantics from it.
1479 */
1480 cil->xc_push_commit_stable = async;
1481
4c2d542f
DC
1482 /*
1483 * If the CIL is empty or we've already pushed the sequence then
70447e0a 1484 * there's no more work that we need to do.
4c2d542f 1485 */
88591e7f
DC
1486 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) ||
1487 push_seq <= cil->xc_push_seq) {
4bb928cd 1488 spin_unlock(&cil->xc_push_lock);
4c2d542f
DC
1489 return;
1490 }
1491
1492 cil->xc_push_seq = push_seq;
33c0dd78 1493 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work);
4bb928cd 1494 spin_unlock(&cil->xc_push_lock);
4c2d542f
DC
1495}
1496
2c6e24ce
DC
1497bool
1498xlog_cil_empty(
1499 struct xlog *log)
1500{
1501 struct xfs_cil *cil = log->l_cilp;
1502 bool empty = false;
1503
1504 spin_lock(&cil->xc_push_lock);
88591e7f 1505 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
2c6e24ce
DC
1506 empty = true;
1507 spin_unlock(&cil->xc_push_lock);
1508 return empty;
1509}
1510
0d227466
DC
1511/*
1512 * If there are intent done items in this transaction and the related intent was
1513 * committed in the current (same) CIL checkpoint, we don't need to write either
1514 * the intent or intent done item to the journal as the change will be
1515 * journalled atomically within this checkpoint. As we cannot remove items from
1516 * the CIL here, mark the related intent with a whiteout so that the CIL push
1517 * can remove it rather than writing it to the journal. Then remove the intent
1518 * done item from the current transaction and release it so it doesn't get put
1519 * into the CIL at all.
1520 */
1521static uint32_t
1522xlog_cil_process_intents(
1523 struct xfs_cil *cil,
1524 struct xfs_trans *tp)
1525{
1526 struct xfs_log_item *lip, *ilip, *next;
1527 uint32_t len = 0;
1528
1529 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
1530 if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE))
1531 continue;
1532
1533 ilip = lip->li_ops->iop_intent(lip);
1534 if (!ilip || !xlog_item_in_current_chkpt(cil, ilip))
1535 continue;
1536 set_bit(XFS_LI_WHITEOUT, &ilip->li_flags);
1537 trace_xfs_cil_whiteout_mark(ilip);
1538 len += ilip->li_lv->lv_bytes;
1539 kmem_free(ilip->li_lv);
1540 ilip->li_lv = NULL;
1541
1542 xfs_trans_del_item(lip);
1543 lip->li_ops->iop_release(lip);
1544 }
1545 return len;
1546}
1547
a44f13ed
DC
1548/*
1549 * Commit a transaction with the given vector to the Committed Item List.
1550 *
1551 * To do this, we need to format the item, pin it in memory if required and
1552 * account for the space used by the transaction. Once we have done that we
1553 * need to release the unused reservation for the transaction, attach the
1554 * transaction to the checkpoint context so we carry the busy extents through
1555 * to checkpoint completion, and then unlock all the items in the transaction.
1556 *
a44f13ed
DC
1557 * Called with the context lock already held in read mode to lock out
1558 * background commit, returns without it held once background commits are
1559 * allowed again.
1560 */
c6f97264 1561void
5f9b4b0d
DC
1562xlog_cil_commit(
1563 struct xlog *log,
a44f13ed 1564 struct xfs_trans *tp,
5f9b4b0d 1565 xfs_csn_t *commit_seq,
70393313 1566 bool regrant)
a44f13ed 1567{
991aaf65 1568 struct xfs_cil *cil = log->l_cilp;
195cd83d 1569 struct xfs_log_item *lip, *next;
0d227466 1570 uint32_t released_space = 0;
a44f13ed 1571
b1c5ebb2
DC
1572 /*
1573 * Do all necessary memory allocation before we lock the CIL.
1574 * This ensures the allocation does not deadlock with a CIL
1575 * push in memory reclaim (e.g. from kswapd).
1576 */
1577 xlog_cil_alloc_shadow_bufs(log, tp);
1578
f5baac35 1579 /* lock out background commit */
991aaf65 1580 down_read(&cil->xc_ctx_lock);
f5baac35 1581
0d227466
DC
1582 if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE)
1583 released_space = xlog_cil_process_intents(cil, tp);
1584
1585 xlog_cil_insert_items(log, tp, released_space);
a44f13ed 1586
2039a272 1587 if (regrant && !xlog_is_shutdown(log))
8b41e3f9
CH
1588 xfs_log_ticket_regrant(log, tp->t_ticket);
1589 else
1590 xfs_log_ticket_ungrant(log, tp->t_ticket);
ba18781b 1591 tp->t_ticket = NULL;
a44f13ed
DC
1592 xfs_trans_unreserve_and_mod_sb(tp);
1593
1594 /*
1595 * Once all the items of the transaction have been copied to the CIL,
195cd83d 1596 * the items can be unlocked and possibly freed.
a44f13ed
DC
1597 *
1598 * This needs to be done before we drop the CIL context lock because we
1599 * have to update state in the log items and unlock them before they go
1600 * to disk. If we don't, then the CIL checkpoint can race with us and
1601 * we can run checkpoint completion before we've updated and unlocked
1602 * the log items. This affects (at least) processing of stale buffers,
1603 * inodes and EFIs.
1604 */
195cd83d
CH
1605 trace_xfs_trans_commit_items(tp, _RET_IP_);
1606 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
1607 xfs_trans_del_item(lip);
1608 if (lip->li_ops->iop_committing)
5f9b4b0d 1609 lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence);
195cd83d 1610 }
5f9b4b0d
DC
1611 if (commit_seq)
1612 *commit_seq = cil->xc_ctx->sequence;
a44f13ed 1613
0e7ab7ef
DC
1614 /* xlog_cil_push_background() releases cil->xc_ctx_lock */
1615 xlog_cil_push_background(log);
a44f13ed
DC
1616}
1617
0020a190
DC
1618/*
1619 * Flush the CIL to stable storage but don't wait for it to complete. This
1620 * requires the CIL push to ensure the commit record for the push hits the disk,
1621 * but otherwise is no different to a push done from a log force.
1622 */
1623void
1624xlog_cil_flush(
1625 struct xlog *log)
1626{
1627 xfs_csn_t seq = log->l_cilp->xc_current_sequence;
1628
1629 trace_xfs_log_force(log->l_mp, seq, _RET_IP_);
1630 xlog_cil_push_now(log, seq, true);
70447e0a
DC
1631
1632 /*
1633 * If the CIL is empty, make sure that any previous checkpoint that may
1634 * still be in an active iclog is pushed to stable storage.
1635 */
1636 if (list_empty(&log->l_cilp->xc_cil))
1637 xfs_log_force(log->l_mp, 0);
0020a190
DC
1638}
1639
71e330b5
DC
1640/*
1641 * Conditionally push the CIL based on the sequence passed in.
1642 *
0020a190
DC
1643 * We only need to push if we haven't already pushed the sequence number given.
1644 * Hence the only time we will trigger a push here is if the push sequence is
1645 * the same as the current context.
71e330b5
DC
1646 *
1647 * We return the current commit lsn to allow the callers to determine if a
1648 * iclog flush is necessary following this call.
71e330b5
DC
1649 */
1650xfs_lsn_t
5f9b4b0d 1651xlog_cil_force_seq(
f7bdf03a 1652 struct xlog *log,
5f9b4b0d 1653 xfs_csn_t sequence)
71e330b5
DC
1654{
1655 struct xfs_cil *cil = log->l_cilp;
1656 struct xfs_cil_ctx *ctx;
1657 xfs_lsn_t commit_lsn = NULLCOMMITLSN;
1658
a44f13ed
DC
1659 ASSERT(sequence <= cil->xc_current_sequence);
1660
0020a190
DC
1661 if (!sequence)
1662 sequence = cil->xc_current_sequence;
1663 trace_xfs_log_force(log->l_mp, sequence, _RET_IP_);
1664
a44f13ed
DC
1665 /*
1666 * check to see if we need to force out the current context.
1667 * xlog_cil_push() handles racing pushes for the same sequence,
1668 * so no need to deal with it here.
1669 */
f876e446 1670restart:
0020a190 1671 xlog_cil_push_now(log, sequence, false);
71e330b5
DC
1672
1673 /*
1674 * See if we can find a previous sequence still committing.
71e330b5
DC
1675 * We need to wait for all previous sequence commits to complete
1676 * before allowing the force of push_seq to go ahead. Hence block
1677 * on commits for those as well.
1678 */
4bb928cd 1679 spin_lock(&cil->xc_push_lock);
71e330b5 1680 list_for_each_entry(ctx, &cil->xc_committing, committing) {
ac983517
DC
1681 /*
1682 * Avoid getting stuck in this loop because we were woken by the
1683 * shutdown, but then went back to sleep once already in the
1684 * shutdown state.
1685 */
2039a272 1686 if (xlog_is_shutdown(log))
ac983517 1687 goto out_shutdown;
a44f13ed 1688 if (ctx->sequence > sequence)
71e330b5
DC
1689 continue;
1690 if (!ctx->commit_lsn) {
1691 /*
1692 * It is still being pushed! Wait for the push to
1693 * complete, then start again from the beginning.
1694 */
0020a190 1695 XFS_STATS_INC(log->l_mp, xs_log_force_sleep);
4bb928cd 1696 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
71e330b5
DC
1697 goto restart;
1698 }
a44f13ed 1699 if (ctx->sequence != sequence)
71e330b5
DC
1700 continue;
1701 /* found it! */
1702 commit_lsn = ctx->commit_lsn;
1703 }
f876e446
DC
1704
1705 /*
1706 * The call to xlog_cil_push_now() executes the push in the background.
1707 * Hence by the time we have got here it our sequence may not have been
1708 * pushed yet. This is true if the current sequence still matches the
1709 * push sequence after the above wait loop and the CIL still contains
8af3dcd3
DC
1710 * dirty objects. This is guaranteed by the push code first adding the
1711 * context to the committing list before emptying the CIL.
f876e446 1712 *
8af3dcd3
DC
1713 * Hence if we don't find the context in the committing list and the
1714 * current sequence number is unchanged then the CIL contents are
1715 * significant. If the CIL is empty, if means there was nothing to push
1716 * and that means there is nothing to wait for. If the CIL is not empty,
1717 * it means we haven't yet started the push, because if it had started
1718 * we would have found the context on the committing list.
f876e446 1719 */
f876e446 1720 if (sequence == cil->xc_current_sequence &&
88591e7f 1721 !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
f876e446
DC
1722 spin_unlock(&cil->xc_push_lock);
1723 goto restart;
1724 }
1725
4bb928cd 1726 spin_unlock(&cil->xc_push_lock);
71e330b5 1727 return commit_lsn;
ac983517
DC
1728
1729 /*
1730 * We detected a shutdown in progress. We need to trigger the log force
1731 * to pass through it's iclog state machine error handling, even though
1732 * we are already in a shutdown state. Hence we can't return
1733 * NULLCOMMITLSN here as that has special meaning to log forces (i.e.
1734 * LSN is already stable), so we return a zero LSN instead.
1735 */
1736out_shutdown:
1737 spin_unlock(&cil->xc_push_lock);
1738 return 0;
71e330b5 1739}
ccf7c23f 1740
af1c2146
DC
1741/*
1742 * Move dead percpu state to the relevant CIL context structures.
1743 *
1744 * We have to lock the CIL context here to ensure that nothing is modifying
1745 * the percpu state, either addition or removal. Both of these are done under
1746 * the CIL context lock, so grabbing that exclusively here will ensure we can
1747 * safely drain the cilpcp for the CPU that is dying.
1748 */
1749void
1750xlog_cil_pcp_dead(
1751 struct xlog *log,
1752 unsigned int cpu)
1753{
1754 struct xfs_cil *cil = log->l_cilp;
7c8ade21 1755 struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
1dd2a2c1 1756 struct xfs_cil_ctx *ctx;
af1c2146
DC
1757
1758 down_write(&cil->xc_ctx_lock);
1dd2a2c1
DC
1759 ctx = cil->xc_ctx;
1760 if (ctx->ticket)
1761 ctx->ticket->t_curr_res += cilpcp->space_reserved;
1762 cilpcp->space_reserved = 0;
1763
df7a4a21
DC
1764 if (!list_empty(&cilpcp->busy_extents))
1765 list_splice_init(&cilpcp->busy_extents, &ctx->busy_extents);
1dd2a2c1 1766 atomic_add(cilpcp->space_used, &ctx->space_used);
7c8ade21 1767 cilpcp->space_used = 0;
af1c2146
DC
1768 up_write(&cil->xc_ctx_lock);
1769}
1770
4c2d542f
DC
1771/*
1772 * Perform initial CIL structure initialisation.
1773 */
1774int
1775xlog_cil_init(
df7a4a21 1776 struct xlog *log)
4c2d542f 1777{
df7a4a21
DC
1778 struct xfs_cil *cil;
1779 struct xfs_cil_ctx *ctx;
1780 struct xlog_cil_pcp *cilpcp;
1781 int cpu;
4c2d542f 1782
707e0dda 1783 cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
4c2d542f 1784 if (!cil)
2451337d 1785 return -ENOMEM;
33c0dd78
DC
1786 /*
1787 * Limit the CIL pipeline depth to 4 concurrent works to bound the
1788 * concurrency the log spinlocks will be exposed to.
1789 */
1790 cil->xc_push_wq = alloc_workqueue("xfs-cil/%s",
1791 XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND),
1792 4, log->l_mp->m_super->s_id);
1793 if (!cil->xc_push_wq)
1794 goto out_destroy_cil;
4c2d542f 1795
af1c2146
DC
1796 cil->xc_log = log;
1797 cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp);
1798 if (!cil->xc_pcp)
1799 goto out_destroy_wq;
1800
df7a4a21
DC
1801 for_each_possible_cpu(cpu) {
1802 cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
1803 INIT_LIST_HEAD(&cilpcp->busy_extents);
1804 }
1805
4c2d542f
DC
1806 INIT_LIST_HEAD(&cil->xc_cil);
1807 INIT_LIST_HEAD(&cil->xc_committing);
1808 spin_lock_init(&cil->xc_cil_lock);
4bb928cd 1809 spin_lock_init(&cil->xc_push_lock);
c7f87f39 1810 init_waitqueue_head(&cil->xc_push_wait);
4c2d542f 1811 init_rwsem(&cil->xc_ctx_lock);
68a74dca 1812 init_waitqueue_head(&cil->xc_start_wait);
4c2d542f 1813 init_waitqueue_head(&cil->xc_commit_wait);
4c2d542f 1814 log->l_cilp = cil;
39823d0f
DC
1815
1816 ctx = xlog_cil_ctx_alloc();
1817 xlog_cil_ctx_switch(cil, ctx);
4c2d542f 1818 return 0;
33c0dd78 1819
af1c2146
DC
1820out_destroy_wq:
1821 destroy_workqueue(cil->xc_push_wq);
33c0dd78
DC
1822out_destroy_cil:
1823 kmem_free(cil);
1824 return -ENOMEM;
4c2d542f
DC
1825}
1826
1827void
1828xlog_cil_destroy(
f7bdf03a 1829 struct xlog *log)
4c2d542f 1830{
88591e7f
DC
1831 struct xfs_cil *cil = log->l_cilp;
1832
1833 if (cil->xc_ctx) {
1834 if (cil->xc_ctx->ticket)
1835 xfs_log_ticket_put(cil->xc_ctx->ticket);
1836 kmem_free(cil->xc_ctx);
4c2d542f
DC
1837 }
1838
88591e7f
DC
1839 ASSERT(list_empty(&cil->xc_cil));
1840 ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
af1c2146 1841 free_percpu(cil->xc_pcp);
88591e7f
DC
1842 destroy_workqueue(cil->xc_push_wq);
1843 kmem_free(cil);
4c2d542f
DC
1844}
1845