bcachefs: Initial commit
[linux-block.git] / fs / bcachefs / journal.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcachefs.h"
9 #include "alloc.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "buckets.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 #include "journal_seq_blacklist.h"
17 #include "super-io.h"
18 #include "trace.h"
19
20 static bool journal_entry_is_open(struct journal *j)
21 {
22         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
23 }
24
25 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
26 {
27         struct journal_buf *w = journal_prev_buf(j);
28
29         atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
30
31         if (!need_write_just_set &&
32             test_bit(JOURNAL_NEED_WRITE, &j->flags))
33                 bch2_time_stats_update(j->delay_time,
34                                        j->need_write_time);
35 #if 0
36         closure_call(&j->io, bch2_journal_write, NULL, NULL);
37 #else
38         /* Shut sparse up: */
39         closure_init(&j->io, NULL);
40         set_closure_fn(&j->io, bch2_journal_write, NULL);
41         bch2_journal_write(&j->io);
42 #endif
43 }
44
45 static void journal_pin_new_entry(struct journal *j, int count)
46 {
47         struct journal_entry_pin_list *p;
48
49         /*
50          * The fifo_push() needs to happen at the same time as j->seq is
51          * incremented for journal_last_seq() to be calculated correctly
52          */
53         atomic64_inc(&j->seq);
54         p = fifo_push_ref(&j->pin);
55
56         INIT_LIST_HEAD(&p->list);
57         INIT_LIST_HEAD(&p->flushed);
58         atomic_set(&p->count, count);
59         p->devs.nr = 0;
60 }
61
62 static void bch2_journal_buf_init(struct journal *j)
63 {
64         struct journal_buf *buf = journal_cur_buf(j);
65
66         memset(buf->has_inode, 0, sizeof(buf->has_inode));
67
68         memset(buf->data, 0, sizeof(*buf->data));
69         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
70         buf->data->u64s = 0;
71 }
72
73 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
74 {
75         return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
76 }
77
78 static inline bool journal_entry_empty(struct jset *j)
79 {
80         struct jset_entry *i;
81
82         if (j->seq != j->last_seq)
83                 return false;
84
85         vstruct_for_each(j, i)
86                 if (i->type || i->u64s)
87                         return false;
88         return true;
89 }
90
91 static enum {
92         JOURNAL_ENTRY_ERROR,
93         JOURNAL_ENTRY_INUSE,
94         JOURNAL_ENTRY_CLOSED,
95         JOURNAL_UNLOCKED,
96 } journal_buf_switch(struct journal *j, bool need_write_just_set)
97 {
98         struct bch_fs *c = container_of(j, struct bch_fs, journal);
99         struct journal_buf *buf;
100         union journal_res_state old, new;
101         u64 v = atomic64_read(&j->reservations.counter);
102
103         lockdep_assert_held(&j->lock);
104
105         do {
106                 old.v = new.v = v;
107                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
108                         return JOURNAL_ENTRY_CLOSED;
109
110                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
111                         return JOURNAL_ENTRY_ERROR;
112
113                 if (new.prev_buf_unwritten)
114                         return JOURNAL_ENTRY_INUSE;
115
116                 /*
117                  * avoid race between setting buf->data->u64s and
118                  * journal_res_put starting write:
119                  */
120                 journal_state_inc(&new);
121
122                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
123                 new.idx++;
124                 new.prev_buf_unwritten = 1;
125
126                 BUG_ON(journal_state_count(new, new.idx));
127         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
128                                        old.v, new.v)) != old.v);
129
130         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
131
132         buf = &j->buf[old.idx];
133         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
134
135         j->prev_buf_sectors =
136                 vstruct_blocks_plus(buf->data, c->block_bits,
137                                     journal_entry_u64s_reserve(buf)) *
138                 c->opts.block_size;
139         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
140
141         bch2_journal_reclaim_fast(j);
142         /* XXX: why set this here, and not in bch2_journal_write()? */
143         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
144
145         if (journal_entry_empty(buf->data))
146                 clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
147         else
148                 set_bit(JOURNAL_NOT_EMPTY, &j->flags);
149
150         journal_pin_new_entry(j, 1);
151
152         bch2_journal_buf_init(j);
153
154         cancel_delayed_work(&j->write_work);
155         spin_unlock(&j->lock);
156
157         if (c->bucket_journal_seq > 1 << 14) {
158                 c->bucket_journal_seq = 0;
159                 bch2_bucket_seq_cleanup(c);
160         }
161
162         c->bucket_journal_seq++;
163
164         /* ugh - might be called from __journal_res_get() under wait_event() */
165         __set_current_state(TASK_RUNNING);
166         bch2_journal_buf_put(j, old.idx, need_write_just_set);
167
168         return JOURNAL_UNLOCKED;
169 }
170
171 void bch2_journal_halt(struct journal *j)
172 {
173         union journal_res_state old, new;
174         u64 v = atomic64_read(&j->reservations.counter);
175
176         do {
177                 old.v = new.v = v;
178                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
179                         return;
180
181                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
182         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
183                                        old.v, new.v)) != old.v);
184
185         journal_wake(j);
186         closure_wake_up(&journal_cur_buf(j)->wait);
187         closure_wake_up(&journal_prev_buf(j)->wait);
188 }
189
190 /*
191  * should _only_ called from journal_res_get() - when we actually want a
192  * journal reservation - journal entry is open means journal is dirty:
193  *
194  * returns:
195  * 1:           success
196  * 0:           journal currently full (must wait)
197  * -EROFS:      insufficient rw devices
198  * -EIO:        journal error
199  */
200 static int journal_entry_open(struct journal *j)
201 {
202         struct journal_buf *buf = journal_cur_buf(j);
203         union journal_res_state old, new;
204         ssize_t u64s;
205         int sectors;
206         u64 v;
207
208         lockdep_assert_held(&j->lock);
209         BUG_ON(journal_entry_is_open(j));
210
211         if (!fifo_free(&j->pin))
212                 return 0;
213
214         sectors = bch2_journal_entry_sectors(j);
215         if (sectors <= 0)
216                 return sectors;
217
218         buf->disk_sectors       = sectors;
219
220         sectors = min_t(unsigned, sectors, buf->size >> 9);
221         j->cur_buf_sectors      = sectors;
222
223         u64s = (sectors << 9) / sizeof(u64);
224
225         /* Subtract the journal header */
226         u64s -= sizeof(struct jset) / sizeof(u64);
227         /*
228          * Btree roots, prio pointers don't get added until right before we do
229          * the write:
230          */
231         u64s -= journal_entry_u64s_reserve(buf);
232         u64s  = max_t(ssize_t, 0L, u64s);
233
234         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
235
236         if (u64s <= le32_to_cpu(buf->data->u64s))
237                 return 0;
238
239         /*
240          * Must be set before marking the journal entry as open:
241          */
242         j->cur_entry_u64s = u64s;
243
244         v = atomic64_read(&j->reservations.counter);
245         do {
246                 old.v = new.v = v;
247
248                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
249                         return -EIO;
250
251                 /* Handle any already added entries */
252                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
253         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
254                                        old.v, new.v)) != old.v);
255
256         if (j->res_get_blocked_start)
257                 bch2_time_stats_update(j->blocked_time,
258                                        j->res_get_blocked_start);
259         j->res_get_blocked_start = 0;
260
261         mod_delayed_work(system_freezable_wq,
262                          &j->write_work,
263                          msecs_to_jiffies(j->write_delay_ms));
264         journal_wake(j);
265         return 1;
266 }
267
268 /*
269  * returns true if there's nothing to flush and no journal write still in flight
270  */
271 static bool journal_flush_write(struct journal *j)
272 {
273         bool ret;
274
275         spin_lock(&j->lock);
276         ret = !j->reservations.prev_buf_unwritten;
277
278         if (!journal_entry_is_open(j)) {
279                 spin_unlock(&j->lock);
280                 return ret;
281         }
282
283         set_bit(JOURNAL_NEED_WRITE, &j->flags);
284         if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
285                 ret = false;
286         else
287                 spin_unlock(&j->lock);
288         return ret;
289 }
290
291 static void journal_write_work(struct work_struct *work)
292 {
293         struct journal *j = container_of(work, struct journal, write_work.work);
294
295         journal_flush_write(j);
296 }
297
298 /*
299  * Given an inode number, if that inode number has data in the journal that
300  * hasn't yet been flushed, return the journal sequence number that needs to be
301  * flushed:
302  */
303 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
304 {
305         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
306         u64 seq = 0;
307
308         if (!test_bit(h, j->buf[0].has_inode) &&
309             !test_bit(h, j->buf[1].has_inode))
310                 return 0;
311
312         spin_lock(&j->lock);
313         if (test_bit(h, journal_cur_buf(j)->has_inode))
314                 seq = journal_cur_seq(j);
315         else if (test_bit(h, journal_prev_buf(j)->has_inode))
316                 seq = journal_cur_seq(j) - 1;
317         spin_unlock(&j->lock);
318
319         return seq;
320 }
321
322 static int __journal_res_get(struct journal *j, struct journal_res *res,
323                               unsigned u64s_min, unsigned u64s_max)
324 {
325         struct bch_fs *c = container_of(j, struct bch_fs, journal);
326         struct journal_buf *buf;
327         int ret;
328 retry:
329         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
330         if (ret)
331                 return ret;
332
333         spin_lock(&j->lock);
334         /*
335          * Recheck after taking the lock, so we don't race with another thread
336          * that just did journal_entry_open() and call journal_entry_close()
337          * unnecessarily
338          */
339         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
340         if (ret) {
341                 spin_unlock(&j->lock);
342                 return 1;
343         }
344
345         /*
346          * If we couldn't get a reservation because the current buf filled up,
347          * and we had room for a bigger entry on disk, signal that we want to
348          * realloc the journal bufs:
349          */
350         buf = journal_cur_buf(j);
351         if (journal_entry_is_open(j) &&
352             buf->size >> 9 < buf->disk_sectors &&
353             buf->size < JOURNAL_ENTRY_SIZE_MAX)
354                 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
355
356         /*
357          * Close the current journal entry if necessary, then try to start a new
358          * one:
359          */
360         switch (journal_buf_switch(j, false)) {
361         case JOURNAL_ENTRY_ERROR:
362                 spin_unlock(&j->lock);
363                 return -EROFS;
364         case JOURNAL_ENTRY_INUSE:
365                 /* haven't finished writing out the previous one: */
366                 spin_unlock(&j->lock);
367                 trace_journal_entry_full(c);
368                 goto blocked;
369         case JOURNAL_ENTRY_CLOSED:
370                 break;
371         case JOURNAL_UNLOCKED:
372                 goto retry;
373         }
374
375         /* We now have a new, closed journal buf - see if we can open it: */
376         ret = journal_entry_open(j);
377         spin_unlock(&j->lock);
378
379         if (ret < 0)
380                 return ret;
381         if (ret)
382                 goto retry;
383
384         /* Journal's full, we have to wait */
385
386         /*
387          * Direct reclaim - can't rely on reclaim from work item
388          * due to freezing..
389          */
390         bch2_journal_reclaim_work(&j->reclaim_work.work);
391
392         trace_journal_full(c);
393 blocked:
394         if (!j->res_get_blocked_start)
395                 j->res_get_blocked_start = local_clock() ?: 1;
396         return 0;
397 }
398
399 /*
400  * Essentially the entry function to the journaling code. When bcachefs is doing
401  * a btree insert, it calls this function to get the current journal write.
402  * Journal write is the structure used set up journal writes. The calling
403  * function will then add its keys to the structure, queuing them for the next
404  * write.
405  *
406  * To ensure forward progress, the current task must not be holding any
407  * btree node write locks.
408  */
409 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
410                                  unsigned u64s_min, unsigned u64s_max)
411 {
412         int ret;
413
414         wait_event(j->wait,
415                    (ret = __journal_res_get(j, res, u64s_min,
416                                             u64s_max)));
417         return ret < 0 ? ret : 0;
418 }
419
420 u64 bch2_journal_last_unwritten_seq(struct journal *j)
421 {
422         u64 seq;
423
424         spin_lock(&j->lock);
425         seq = journal_cur_seq(j);
426         if (j->reservations.prev_buf_unwritten)
427                 seq--;
428         spin_unlock(&j->lock);
429
430         return seq;
431 }
432
433 /**
434  * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't
435  * open yet, or wait if we cannot
436  *
437  * used by the btree interior update machinery, when it needs to write a new
438  * btree root - every journal entry contains the roots of all the btrees, so it
439  * doesn't need to bother with getting a journal reservation
440  */
441 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
442 {
443         int ret;
444
445         spin_lock(&j->lock);
446         BUG_ON(seq > journal_cur_seq(j));
447
448         if (seq < journal_cur_seq(j) ||
449             journal_entry_is_open(j)) {
450                 spin_unlock(&j->lock);
451                 return 1;
452         }
453
454         ret = journal_entry_open(j);
455         if (!ret)
456                 closure_wait(&j->async_wait, parent);
457         spin_unlock(&j->lock);
458
459         if (!ret)
460                 bch2_journal_reclaim_work(&j->reclaim_work.work);
461
462         return ret;
463 }
464
465 /**
466  * bch2_journal_wait_on_seq - wait for a journal entry to be written
467  *
468  * does _not_ cause @seq to be written immediately - if there is no other
469  * activity to cause the relevant journal entry to be filled up or flushed it
470  * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
471  * configurable).
472  */
473 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
474 {
475         spin_lock(&j->lock);
476
477         BUG_ON(seq > journal_cur_seq(j));
478
479         if (bch2_journal_error(j)) {
480                 spin_unlock(&j->lock);
481                 return;
482         }
483
484         if (seq == journal_cur_seq(j)) {
485                 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
486                         BUG();
487         } else if (seq + 1 == journal_cur_seq(j) &&
488                    j->reservations.prev_buf_unwritten) {
489                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
490                         BUG();
491
492                 smp_mb();
493
494                 /* check if raced with write completion (or failure) */
495                 if (!j->reservations.prev_buf_unwritten ||
496                     bch2_journal_error(j))
497                         closure_wake_up(&journal_prev_buf(j)->wait);
498         }
499
500         spin_unlock(&j->lock);
501 }
502
503 /**
504  * bch2_journal_flush_seq_async - wait for a journal entry to be written
505  *
506  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
507  * necessary
508  */
509 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
510 {
511         struct journal_buf *buf;
512
513         spin_lock(&j->lock);
514
515         BUG_ON(seq > journal_cur_seq(j));
516
517         if (bch2_journal_error(j)) {
518                 spin_unlock(&j->lock);
519                 return;
520         }
521
522         if (seq == journal_cur_seq(j)) {
523                 bool set_need_write = false;
524
525                 buf = journal_cur_buf(j);
526
527                 if (parent && !closure_wait(&buf->wait, parent))
528                         BUG();
529
530                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
531                         j->need_write_time = local_clock();
532                         set_need_write = true;
533                 }
534
535                 switch (journal_buf_switch(j, set_need_write)) {
536                 case JOURNAL_ENTRY_ERROR:
537                         if (parent)
538                                 closure_wake_up(&buf->wait);
539                         break;
540                 case JOURNAL_ENTRY_CLOSED:
541                         /*
542                          * Journal entry hasn't been opened yet, but caller
543                          * claims it has something
544                          */
545                         BUG();
546                 case JOURNAL_ENTRY_INUSE:
547                         break;
548                 case JOURNAL_UNLOCKED:
549                         return;
550                 }
551         } else if (parent &&
552                    seq + 1 == journal_cur_seq(j) &&
553                    j->reservations.prev_buf_unwritten) {
554                 buf = journal_prev_buf(j);
555
556                 if (!closure_wait(&buf->wait, parent))
557                         BUG();
558
559                 smp_mb();
560
561                 /* check if raced with write completion (or failure) */
562                 if (!j->reservations.prev_buf_unwritten ||
563                     bch2_journal_error(j))
564                         closure_wake_up(&buf->wait);
565         }
566
567         spin_unlock(&j->lock);
568 }
569
570 static int journal_seq_flushed(struct journal *j, u64 seq)
571 {
572         struct journal_buf *buf;
573         int ret = 1;
574
575         spin_lock(&j->lock);
576         BUG_ON(seq > journal_cur_seq(j));
577
578         if (seq == journal_cur_seq(j)) {
579                 bool set_need_write = false;
580
581                 ret = 0;
582
583                 buf = journal_cur_buf(j);
584
585                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
586                         j->need_write_time = local_clock();
587                         set_need_write = true;
588                 }
589
590                 switch (journal_buf_switch(j, set_need_write)) {
591                 case JOURNAL_ENTRY_ERROR:
592                         ret = -EIO;
593                         break;
594                 case JOURNAL_ENTRY_CLOSED:
595                         /*
596                          * Journal entry hasn't been opened yet, but caller
597                          * claims it has something
598                          */
599                         BUG();
600                 case JOURNAL_ENTRY_INUSE:
601                         break;
602                 case JOURNAL_UNLOCKED:
603                         return 0;
604                 }
605         } else if (seq + 1 == journal_cur_seq(j) &&
606                    j->reservations.prev_buf_unwritten) {
607                 ret = bch2_journal_error(j);
608         }
609
610         spin_unlock(&j->lock);
611
612         return ret;
613 }
614
615 int bch2_journal_flush_seq(struct journal *j, u64 seq)
616 {
617         u64 start_time = local_clock();
618         int ret, ret2;
619
620         ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
621
622         bch2_time_stats_update(j->flush_seq_time, start_time);
623
624         return ret ?: ret2 < 0 ? ret2 : 0;
625 }
626
627 /**
628  * bch2_journal_meta_async - force a journal entry to be written
629  */
630 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
631 {
632         struct journal_res res;
633         unsigned u64s = jset_u64s(0);
634
635         memset(&res, 0, sizeof(res));
636
637         bch2_journal_res_get(j, &res, u64s, u64s);
638         bch2_journal_res_put(j, &res);
639
640         bch2_journal_flush_seq_async(j, res.seq, parent);
641 }
642
643 int bch2_journal_meta(struct journal *j)
644 {
645         struct journal_res res;
646         unsigned u64s = jset_u64s(0);
647         int ret;
648
649         memset(&res, 0, sizeof(res));
650
651         ret = bch2_journal_res_get(j, &res, u64s, u64s);
652         if (ret)
653                 return ret;
654
655         bch2_journal_res_put(j, &res);
656
657         return bch2_journal_flush_seq(j, res.seq);
658 }
659
660 /*
661  * bch2_journal_flush_async - if there is an open journal entry, or a journal
662  * still being written, write it and wait for the write to complete
663  */
664 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
665 {
666         u64 seq, journal_seq;
667
668         spin_lock(&j->lock);
669         journal_seq = journal_cur_seq(j);
670
671         if (journal_entry_is_open(j)) {
672                 seq = journal_seq;
673         } else if (journal_seq) {
674                 seq = journal_seq - 1;
675         } else {
676                 spin_unlock(&j->lock);
677                 return;
678         }
679         spin_unlock(&j->lock);
680
681         bch2_journal_flush_seq_async(j, seq, parent);
682 }
683
684 int bch2_journal_flush(struct journal *j)
685 {
686         u64 seq, journal_seq;
687
688         spin_lock(&j->lock);
689         journal_seq = journal_cur_seq(j);
690
691         if (journal_entry_is_open(j)) {
692                 seq = journal_seq;
693         } else if (journal_seq) {
694                 seq = journal_seq - 1;
695         } else {
696                 spin_unlock(&j->lock);
697                 return 0;
698         }
699         spin_unlock(&j->lock);
700
701         return bch2_journal_flush_seq(j, seq);
702 }
703
704 /* allocate journal on a device: */
705
706 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
707                                          bool new_fs, struct closure *cl)
708 {
709         struct bch_fs *c = ca->fs;
710         struct journal_device *ja = &ca->journal;
711         struct bch_sb_field_journal *journal_buckets;
712         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
713         int ret = 0;
714
715         /* don't handle reducing nr of buckets yet: */
716         if (nr <= ja->nr)
717                 return 0;
718
719         ret = -ENOMEM;
720         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
721         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
722         if (!new_buckets || !new_bucket_seq)
723                 goto err;
724
725         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
726                                 nr + sizeof(*journal_buckets) / sizeof(u64));
727         if (!journal_buckets)
728                 goto err;
729
730         if (c)
731                 spin_lock(&c->journal.lock);
732
733         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
734         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
735         swap(new_buckets,       ja->buckets);
736         swap(new_bucket_seq,    ja->bucket_seq);
737
738         if (c)
739                 spin_unlock(&c->journal.lock);
740
741         while (ja->nr < nr) {
742                 struct open_bucket *ob = NULL;
743                 long bucket;
744
745                 if (new_fs) {
746                         percpu_down_read(&c->usage_lock);
747                         bucket = bch2_bucket_alloc_new_fs(ca);
748                         percpu_up_read(&c->usage_lock);
749
750                         if (bucket < 0) {
751                                 ret = -ENOSPC;
752                                 goto err;
753                         }
754                 } else {
755                         int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
756                         if (ob_idx < 0) {
757                                 ret = cl ? -EAGAIN : -ENOSPC;
758                                 goto err;
759                         }
760
761                         ob = c->open_buckets + ob_idx;
762                         bucket = sector_to_bucket(ca, ob->ptr.offset);
763                 }
764
765                 if (c) {
766                         percpu_down_read(&c->usage_lock);
767                         spin_lock(&c->journal.lock);
768                 }
769
770                 __array_insert_item(ja->buckets,                ja->nr, ja->last_idx);
771                 __array_insert_item(ja->bucket_seq,             ja->nr, ja->last_idx);
772                 __array_insert_item(journal_buckets->buckets,   ja->nr, ja->last_idx);
773
774                 ja->buckets[ja->last_idx] = bucket;
775                 ja->bucket_seq[ja->last_idx] = 0;
776                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
777
778                 if (ja->last_idx < ja->nr) {
779                         if (ja->cur_idx >= ja->last_idx)
780                                 ja->cur_idx++;
781                         ja->last_idx++;
782                 }
783                 ja->nr++;
784
785                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
786                                 ca->mi.bucket_size,
787                                 gc_phase(GC_PHASE_SB),
788                                 new_fs
789                                 ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
790                                 : 0);
791
792                 if (c) {
793                         spin_unlock(&c->journal.lock);
794                         percpu_up_read(&c->usage_lock);
795                 }
796
797                 if (!new_fs)
798                         bch2_open_bucket_put(c, ob);
799         }
800
801         ret = 0;
802 err:
803         kfree(new_bucket_seq);
804         kfree(new_buckets);
805
806         return ret;
807 }
808
809 /*
810  * Allocate more journal space at runtime - not currently making use if it, but
811  * the code works:
812  */
813 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
814                                 unsigned nr)
815 {
816         struct journal_device *ja = &ca->journal;
817         struct closure cl;
818         unsigned current_nr;
819         int ret;
820
821         closure_init_stack(&cl);
822
823         do {
824                 struct disk_reservation disk_res = { 0, 0 };
825
826                 closure_sync(&cl);
827
828                 mutex_lock(&c->sb_lock);
829                 current_nr = ja->nr;
830
831                 /*
832                  * note: journal buckets aren't really counted as _sectors_ used yet, so
833                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
834                  * when space used goes up without a reservation - but we do need the
835                  * reservation to ensure we'll actually be able to allocate:
836                  */
837
838                 if (bch2_disk_reservation_get(c, &disk_res,
839                                 bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
840                         mutex_unlock(&c->sb_lock);
841                         return -ENOSPC;
842                 }
843
844                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
845
846                 bch2_disk_reservation_put(c, &disk_res);
847
848                 if (ja->nr != current_nr)
849                         bch2_write_super(c);
850                 mutex_unlock(&c->sb_lock);
851         } while (ret == -EAGAIN);
852
853         return ret;
854 }
855
856 int bch2_dev_journal_alloc(struct bch_dev *ca)
857 {
858         unsigned nr;
859
860         if (dynamic_fault("bcachefs:add:journal_alloc"))
861                 return -ENOMEM;
862
863         /*
864          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
865          * is smaller:
866          */
867         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
868                      BCH_JOURNAL_BUCKETS_MIN,
869                      min(1 << 10,
870                          (1 << 20) / ca->mi.bucket_size));
871
872         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
873 }
874
875 /* startup/shutdown: */
876
877 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
878 {
879         union journal_res_state state;
880         struct journal_buf *w;
881         bool ret;
882
883         spin_lock(&j->lock);
884         state = READ_ONCE(j->reservations);
885         w = j->buf + !state.idx;
886
887         ret = state.prev_buf_unwritten &&
888                 bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
889         spin_unlock(&j->lock);
890
891         return ret;
892 }
893
894 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
895 {
896         spin_lock(&j->lock);
897         bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
898         spin_unlock(&j->lock);
899
900         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
901 }
902
903 void bch2_fs_journal_stop(struct journal *j)
904 {
905         struct bch_fs *c = container_of(j, struct bch_fs, journal);
906
907         wait_event(j->wait, journal_flush_write(j));
908
909         /* do we need to write another journal entry? */
910         if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
911             c->btree_roots_dirty)
912                 bch2_journal_meta(j);
913
914         BUG_ON(!bch2_journal_error(j) &&
915                test_bit(JOURNAL_NOT_EMPTY, &j->flags));
916
917         cancel_delayed_work_sync(&j->write_work);
918         cancel_delayed_work_sync(&j->reclaim_work);
919 }
920
921 void bch2_fs_journal_start(struct journal *j)
922 {
923         struct journal_seq_blacklist *bl;
924         u64 blacklist = 0;
925
926         list_for_each_entry(bl, &j->seq_blacklist, list)
927                 blacklist = max(blacklist, bl->end);
928
929         spin_lock(&j->lock);
930
931         set_bit(JOURNAL_STARTED, &j->flags);
932
933         while (journal_cur_seq(j) < blacklist)
934                 journal_pin_new_entry(j, 0);
935
936         /*
937          * journal_buf_switch() only inits the next journal entry when it
938          * closes an open journal entry - the very first journal entry gets
939          * initialized here:
940          */
941         journal_pin_new_entry(j, 1);
942         bch2_journal_buf_init(j);
943
944         spin_unlock(&j->lock);
945
946         /*
947          * Adding entries to the next journal entry before allocating space on
948          * disk for the next journal entry - this is ok, because these entries
949          * only have to go down with the next journal entry we write:
950          */
951         bch2_journal_seq_blacklist_write(j);
952
953         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
954 }
955
956 /* init/exit: */
957
958 void bch2_dev_journal_exit(struct bch_dev *ca)
959 {
960         kfree(ca->journal.bio);
961         kfree(ca->journal.buckets);
962         kfree(ca->journal.bucket_seq);
963
964         ca->journal.bio         = NULL;
965         ca->journal.buckets     = NULL;
966         ca->journal.bucket_seq  = NULL;
967 }
968
969 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
970 {
971         struct journal_device *ja = &ca->journal;
972         struct bch_sb_field_journal *journal_buckets =
973                 bch2_sb_get_journal(sb);
974         unsigned i, nr_bvecs;
975
976         ja->nr = bch2_nr_journal_buckets(journal_buckets);
977
978         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
979         if (!ja->bucket_seq)
980                 return -ENOMEM;
981
982         nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE);
983
984         ca->journal.bio = bio_kmalloc(nr_bvecs, GFP_KERNEL);
985         if (!ca->journal.bio)
986                 return -ENOMEM;
987
988         bio_init(ca->journal.bio, NULL, ca->journal.bio->bi_inline_vecs, nr_bvecs, 0);
989
990         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
991         if (!ja->buckets)
992                 return -ENOMEM;
993
994         for (i = 0; i < ja->nr; i++)
995                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
996
997         return 0;
998 }
999
1000 void bch2_fs_journal_exit(struct journal *j)
1001 {
1002         kvpfree(j->buf[1].data, j->buf[1].size);
1003         kvpfree(j->buf[0].data, j->buf[0].size);
1004         free_fifo(&j->pin);
1005 }
1006
1007 int bch2_fs_journal_init(struct journal *j)
1008 {
1009         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1010         static struct lock_class_key res_key;
1011         int ret = 0;
1012
1013         pr_verbose_init(c->opts, "");
1014
1015         spin_lock_init(&j->lock);
1016         spin_lock_init(&j->err_lock);
1017         init_waitqueue_head(&j->wait);
1018         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1019         INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
1020         mutex_init(&j->blacklist_lock);
1021         INIT_LIST_HEAD(&j->seq_blacklist);
1022         mutex_init(&j->reclaim_lock);
1023
1024         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1025
1026         j->buf[0].size          = JOURNAL_ENTRY_SIZE_MIN;
1027         j->buf[1].size          = JOURNAL_ENTRY_SIZE_MIN;
1028         j->write_delay_ms       = 1000;
1029         j->reclaim_delay_ms     = 100;
1030
1031         bkey_extent_init(&j->key);
1032
1033         atomic64_set(&j->reservations.counter,
1034                 ((union journal_res_state)
1035                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1036
1037         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1038             !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
1039             !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
1040                 ret = -ENOMEM;
1041                 goto out;
1042         }
1043
1044         j->pin.front = j->pin.back = 1;
1045 out:
1046         pr_verbose_init(c->opts, "ret %i", ret);
1047         return ret;
1048 }
1049
1050 /* debug: */
1051
1052 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
1053 {
1054         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1055         union journal_res_state *s = &j->reservations;
1056         struct bch_dev *ca;
1057         unsigned iter;
1058         ssize_t ret = 0;
1059
1060         rcu_read_lock();
1061         spin_lock(&j->lock);
1062
1063         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1064                          "active journal entries:\t%llu\n"
1065                          "seq:\t\t\t%llu\n"
1066                          "last_seq:\t\t%llu\n"
1067                          "last_seq_ondisk:\t%llu\n"
1068                          "reservation count:\t%u\n"
1069                          "reservation offset:\t%u\n"
1070                          "current entry u64s:\t%u\n"
1071                          "io in flight:\t\t%i\n"
1072                          "need write:\t\t%i\n"
1073                          "dirty:\t\t\t%i\n"
1074                          "replay done:\t\t%i\n",
1075                          fifo_used(&j->pin),
1076                          journal_cur_seq(j),
1077                          journal_last_seq(j),
1078                          j->last_seq_ondisk,
1079                          journal_state_count(*s, s->idx),
1080                          s->cur_entry_offset,
1081                          j->cur_entry_u64s,
1082                          s->prev_buf_unwritten,
1083                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
1084                          journal_entry_is_open(j),
1085                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
1086
1087         for_each_member_device_rcu(ca, c, iter,
1088                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
1089                 struct journal_device *ja = &ca->journal;
1090
1091                 if (!ja->nr)
1092                         continue;
1093
1094                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1095                                  "dev %u:\n"
1096                                  "\tnr\t\t%u\n"
1097                                  "\tcur_idx\t\t%u (seq %llu)\n"
1098                                  "\tlast_idx\t%u (seq %llu)\n",
1099                                  iter, ja->nr,
1100                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
1101                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
1102         }
1103
1104         spin_unlock(&j->lock);
1105         rcu_read_unlock();
1106
1107         return ret;
1108 }
1109
1110 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
1111 {
1112         struct journal_entry_pin_list *pin_list;
1113         struct journal_entry_pin *pin;
1114         ssize_t ret = 0;
1115         u64 i;
1116
1117         spin_lock(&j->lock);
1118         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1119                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1120                                  "%llu: count %u\n",
1121                                  i, atomic_read(&pin_list->count));
1122
1123                 list_for_each_entry(pin, &pin_list->list, list)
1124                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1125                                          "\t%p %pf\n",
1126                                          pin, pin->flush);
1127
1128                 if (!list_empty(&pin_list->flushed))
1129                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1130                                          "flushed:\n");
1131
1132                 list_for_each_entry(pin, &pin_list->flushed, list)
1133                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1134                                          "\t%p %pf\n",
1135                                          pin, pin->flush);
1136         }
1137         spin_unlock(&j->lock);
1138
1139         return ret;
1140 }