dm vdo index: fix various small nits
[linux-2.6-block.git] / drivers / md / dm-vdo / index.c
CommitLineData
766130e8
MS
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6
7#include "index.h"
8
9#include "funnel-requestqueue.h"
10#include "hash-utils.h"
11#include "logger.h"
12#include "memory-alloc.h"
13#include "sparse-cache.h"
14
15static const u64 NO_LAST_SAVE = U64_MAX;
16
17/*
18 * When searching for deduplication records, the index first searches the volume index, and then
19 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
20 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
21 * committed (either the open chapter or a recently closed one), the index searches the in-memory
22 * representation of the chapter. Finally, if the volume index does not find a record and the index
23 * is sparse, the index will search the sparse cache.
24 *
25 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
26 * chapter writer, and sparse cache barrier messages for the sparse cache.
27 *
28 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
29 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
30 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
31 * and also informs all other zones that it has closed the chapter. Each other zone will then close
32 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
33 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
34 *
35 * The last zone to close the chapter also removes the oldest chapter from the volume index.
36 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
37 * means that those zones will never ask the volume index about it. No zone is allowed to get more
38 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
39 * chapter before the previous one has been closed by all zones, it is forced to wait.
40 *
41 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
42 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
43 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
44 * paused its operations, the cache membership is changed and each zone is then informed that it
45 * can proceed. More details can be found in the sparse cache documentation.
46 *
47 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
48 * barrier message to change the sparse cache membership, so the index simulates the message by
49 * invoking the handler directly.
50 */
51
52struct chapter_writer {
53 /* The index to which we belong */
54 struct uds_index *index;
55 /* The thread to do the writing */
56 struct thread *thread;
57 /* The lock protecting the following fields */
58 struct mutex mutex;
59 /* The condition signalled on state changes */
60 struct cond_var cond;
61 /* Set to true to stop the thread */
62 bool stop;
63 /* The result from the most recent write */
64 int result;
65 /* The number of bytes allocated by the chapter writer */
66 size_t memory_size;
67 /* The number of zones which have submitted a chapter for writing */
68 unsigned int zones_to_write;
69 /* Open chapter index used by uds_close_open_chapter() */
70 struct open_chapter_index *open_chapter_index;
71 /* Collated records used by uds_close_open_chapter() */
72 struct uds_volume_record *collated_records;
73 /* The chapters to write (one per zone) */
74 struct open_chapter_zone *chapters[];
75};
76
77static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
78{
79 return uds_is_chapter_sparse(zone->index->volume->geometry,
80 zone->oldest_virtual_chapter,
81 zone->newest_virtual_chapter, virtual_chapter);
82}
83
84static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
85 struct uds_index *index)
86{
87 int result;
88 struct uds_request *request;
89
90 result = uds_allocate(1, struct uds_request, __func__, &request);
91 if (result != UDS_SUCCESS)
92 return result;
93
94 request->index = index;
95 request->unbatched = true;
96 request->zone_number = zone;
97 request->zone_message = message;
98
99 uds_enqueue_request(request, STAGE_MESSAGE);
100 return UDS_SUCCESS;
101}
102
103static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
104{
105 struct uds_zone_message message = {
106 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
107 .virtual_chapter = virtual_chapter,
108 };
109 unsigned int zone;
110
111 for (zone = 0; zone < index->zone_count; zone++) {
112 int result = launch_zone_message(message, zone, index);
113
114 ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
115 }
116}
117
118/*
119 * Determine whether this request should trigger a sparse cache barrier message to change the
120 * membership of the sparse cache. If a change in membership is desired, the function returns the
121 * chapter number to add.
122 */
123static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
124{
125 u64 virtual_chapter;
126 struct index_zone *zone;
127
128 virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
129 &request->record_name);
130 if (virtual_chapter == NO_CHAPTER)
131 return NO_CHAPTER;
132
133 zone = index->zones[request->zone_number];
134 if (!is_zone_chapter_sparse(zone, virtual_chapter))
135 return NO_CHAPTER;
136
137 /*
138 * FIXME: Optimize for a common case by remembering the chapter from the most recent
139 * barrier message and skipping this chapter if is it the same.
140 */
141
142 return virtual_chapter;
143}
144
145/*
146 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
147 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
148 * of index does nothing here.
149 */
150static int simulate_index_zone_barrier_message(struct index_zone *zone,
151 struct uds_request *request)
152{
153 u64 sparse_virtual_chapter;
154
155 if ((zone->index->zone_count > 1) ||
156 !uds_is_sparse_geometry(zone->index->volume->geometry))
157 return UDS_SUCCESS;
158
159 sparse_virtual_chapter = triage_index_request(zone->index, request);
160 if (sparse_virtual_chapter == NO_CHAPTER)
161 return UDS_SUCCESS;
162
163 return uds_update_sparse_cache(zone, sparse_virtual_chapter);
164}
165
166/* This is the request processing function for the triage queue. */
167static void triage_request(struct uds_request *request)
168{
169 struct uds_index *index = request->index;
170 u64 sparse_virtual_chapter = triage_index_request(index, request);
171
172 if (sparse_virtual_chapter != NO_CHAPTER)
173 enqueue_barrier_messages(index, sparse_virtual_chapter);
174
175 uds_enqueue_request(request, STAGE_INDEX);
176}
177
178static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
179{
180 int result;
181 struct chapter_writer *writer = index->chapter_writer;
182
183 uds_lock_mutex(&writer->mutex);
184 while (index->newest_virtual_chapter < current_chapter_number)
185 uds_wait_cond(&writer->cond, &writer->mutex);
186 result = writer->result;
187 uds_unlock_mutex(&writer->mutex);
188
189 if (result != UDS_SUCCESS)
190 return uds_log_error_strerror(result,
191 "Writing of previous open chapter failed");
192
193 return UDS_SUCCESS;
194}
195
196static int swap_open_chapter(struct index_zone *zone)
197{
198 int result;
199 struct open_chapter_zone *temporary_chapter;
200
201 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
202 if (result != UDS_SUCCESS)
203 return result;
204
205 temporary_chapter = zone->open_chapter;
206 zone->open_chapter = zone->writing_chapter;
207 zone->writing_chapter = temporary_chapter;
208 return UDS_SUCCESS;
209}
210
211/*
212 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
213 * writing until all zones have closed it.
214 */
215static unsigned int start_closing_chapter(struct uds_index *index,
216 unsigned int zone_number,
217 struct open_chapter_zone *chapter)
218{
219 unsigned int finished_zones;
220 struct chapter_writer *writer = index->chapter_writer;
221
222 uds_lock_mutex(&writer->mutex);
223 finished_zones = ++writer->zones_to_write;
224 writer->chapters[zone_number] = chapter;
225 uds_broadcast_cond(&writer->cond);
226 uds_unlock_mutex(&writer->mutex);
227
228 return finished_zones;
229}
230
231static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
232{
233 int result;
234 unsigned int i;
235 struct uds_zone_message zone_message = {
236 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
237 .virtual_chapter = closed_chapter,
238 };
239
240 for (i = 0; i < zone->index->zone_count; i++) {
241 if (zone->id == i)
242 continue;
243
244 result = launch_zone_message(zone_message, i, zone->index);
245 if (result != UDS_SUCCESS)
246 return result;
247 }
248
249 return UDS_SUCCESS;
250}
251
252static int open_next_chapter(struct index_zone *zone)
253{
254 int result;
255 u64 closed_chapter;
256 u64 expiring;
257 unsigned int finished_zones;
258 u32 expire_chapters;
259
260 uds_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
261 (unsigned long long) zone->newest_virtual_chapter, zone->id,
262 zone->open_chapter->size,
263 zone->open_chapter->capacity - zone->open_chapter->size);
264
265 result = swap_open_chapter(zone);
266 if (result != UDS_SUCCESS)
267 return result;
268
269 closed_chapter = zone->newest_virtual_chapter++;
270 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
271 zone->newest_virtual_chapter);
272 uds_reset_open_chapter(zone->open_chapter);
273
274 finished_zones = start_closing_chapter(zone->index, zone->id,
275 zone->writing_chapter);
276 if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
277 result = announce_chapter_closed(zone, closed_chapter);
278 if (result != UDS_SUCCESS)
279 return result;
280 }
281
282 expiring = zone->oldest_virtual_chapter;
283 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
284 zone->newest_virtual_chapter);
285 zone->oldest_virtual_chapter += expire_chapters;
286
287 if (finished_zones < zone->index->zone_count)
288 return UDS_SUCCESS;
289
290 while (expire_chapters-- > 0)
291 uds_forget_chapter(zone->index->volume, expiring++);
292
293 return UDS_SUCCESS;
294}
295
296static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
297{
298 if (zone->newest_virtual_chapter == virtual_chapter)
299 return open_next_chapter(zone);
300
301 return UDS_SUCCESS;
302}
303
304static int dispatch_index_zone_control_request(struct uds_request *request)
305{
306 struct uds_zone_message *message = &request->zone_message;
307 struct index_zone *zone = request->index->zones[request->zone_number];
308
309 switch (message->type) {
310 case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
311 return uds_update_sparse_cache(zone, message->virtual_chapter);
312
313 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
314 return handle_chapter_closed(zone, message->virtual_chapter);
315
316 default:
317 uds_log_error("invalid message type: %d", message->type);
318 return UDS_INVALID_ARGUMENT;
319 }
320}
321
322static void set_request_location(struct uds_request *request,
323 enum uds_index_region new_location)
324{
325 request->location = new_location;
326 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
327 (new_location == UDS_LOCATION_IN_DENSE) ||
328 (new_location == UDS_LOCATION_IN_SPARSE));
329}
330
331static void set_chapter_location(struct uds_request *request,
332 const struct index_zone *zone, u64 virtual_chapter)
333{
334 request->found = true;
335 if (virtual_chapter == zone->newest_virtual_chapter)
336 request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
337 else if (is_zone_chapter_sparse(zone, virtual_chapter))
338 request->location = UDS_LOCATION_IN_SPARSE;
339 else
340 request->location = UDS_LOCATION_IN_DENSE;
341}
342
343static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
344 u64 virtual_chapter, bool *found)
345{
346 int result;
347 struct volume *volume;
348 u16 record_page_number;
349 u32 chapter;
350
351 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
352 &record_page_number);
353 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
354 return result;
355
356 request->virtual_chapter = virtual_chapter;
357 volume = zone->index->volume;
358 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
359 return uds_search_cached_record_page(volume, request, chapter,
360 record_page_number, found);
361}
362
363static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
364 bool *found)
365{
366 struct volume *volume;
367
368 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
369 *found = true;
370 return UDS_SUCCESS;
371 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
372 *found = false;
373 return UDS_SUCCESS;
374 }
375
376 if (request->virtual_chapter == zone->newest_virtual_chapter) {
377 uds_search_open_chapter(zone->open_chapter, &request->record_name,
378 &request->old_metadata, found);
379 return UDS_SUCCESS;
380 }
381
382 if ((zone->newest_virtual_chapter > 0) &&
383 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
384 (zone->writing_chapter->size > 0)) {
385 uds_search_open_chapter(zone->writing_chapter, &request->record_name,
386 &request->old_metadata, found);
387 return UDS_SUCCESS;
388 }
389
390 volume = zone->index->volume;
391 if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
392 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
393 request->zone_number))
394 return search_sparse_cache_in_zone(zone, request,
395 request->virtual_chapter, found);
396
397 return uds_search_volume_page_cache(volume, request, found);
398}
399
400static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
401 const struct uds_record_data *metadata)
402{
403 unsigned int remaining;
404
405 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
406 metadata);
407 if (remaining == 0)
408 return open_next_chapter(zone);
409
410 return UDS_SUCCESS;
411}
412
413static int search_index_zone(struct index_zone *zone, struct uds_request *request)
414{
415 int result;
416 struct volume_index_record record;
417 bool overflow_record, found = false;
418 struct uds_record_data *metadata;
419 u64 chapter;
420
421 result = uds_get_volume_index_record(zone->index->volume_index,
422 &request->record_name, &record);
423 if (result != UDS_SUCCESS)
424 return result;
425
426 if (record.is_found) {
427 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
428 set_request_location(request, UDS_LOCATION_UNKNOWN);
429
430 request->virtual_chapter = record.virtual_chapter;
431 result = get_record_from_zone(zone, request, &found);
432 if (result != UDS_SUCCESS)
433 return result;
434 }
435
436 if (found)
437 set_chapter_location(request, zone, record.virtual_chapter);
438
439 /*
440 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
441 * one chapter and collided with an existing record), it will exist as a collision record
442 * in the volume index, but we won't find it in the volume. This case needs special
443 * handling.
444 */
445 overflow_record = (record.is_found && record.is_collision && !found);
446 chapter = zone->newest_virtual_chapter;
447 if (found || overflow_record) {
448 if ((request->type == UDS_QUERY_NO_UPDATE) ||
449 ((request->type == UDS_QUERY) && overflow_record)) {
450 /* There is nothing left to do. */
451 return UDS_SUCCESS;
452 }
453
454 if (record.virtual_chapter != chapter) {
455 /*
456 * Update the volume index to reference the new chapter for the block. If
457 * the record had been deleted or dropped from the chapter index, it will
458 * be back.
459 */
460 result = uds_set_volume_index_record_chapter(&record, chapter);
461 } else if (request->type != UDS_UPDATE) {
462 /* The record is already in the open chapter. */
463 return UDS_SUCCESS;
464 }
465 } else {
466 /*
467 * The record wasn't in the volume index, so check whether the
468 * name is in a cached sparse chapter. If we found the name on
469 * a previous search, use that result instead.
470 */
471 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
472 found = true;
473 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
474 found = false;
475 } else if (uds_is_sparse_geometry(zone->index->volume->geometry) &&
476 !uds_is_volume_index_sample(zone->index->volume_index,
477 &request->record_name)) {
478 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
479 &found);
480 if (result != UDS_SUCCESS)
481 return result;
482 }
483
484 if (found)
485 set_request_location(request, UDS_LOCATION_IN_SPARSE);
486
487 if ((request->type == UDS_QUERY_NO_UPDATE) ||
488 ((request->type == UDS_QUERY) && !found)) {
489 /* There is nothing left to do. */
490 return UDS_SUCCESS;
491 }
492
493 /*
494 * Add a new entry to the volume index referencing the open chapter. This needs to
495 * be done both for new records, and for records from cached sparse chapters.
496 */
497 result = uds_put_volume_index_record(&record, chapter);
498 }
499
500 if (result == UDS_OVERFLOW) {
501 /*
502 * The volume index encountered a delta list overflow. The condition was already
503 * logged. We will go on without adding the record to the open chapter.
504 */
505 return UDS_SUCCESS;
506 }
507
508 if (result != UDS_SUCCESS)
509 return result;
510
511 if (!found || (request->type == UDS_UPDATE)) {
512 /* This is a new record or we're updating an existing record. */
513 metadata = &request->new_metadata;
514 } else {
515 /* Move the existing record to the open chapter. */
516 metadata = &request->old_metadata;
517 }
518
519 return put_record_in_zone(zone, request, metadata);
520}
521
522static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
523{
524 int result;
525 struct volume_index_record record;
526
527 result = uds_get_volume_index_record(zone->index->volume_index,
528 &request->record_name, &record);
529 if (result != UDS_SUCCESS)
530 return result;
531
532 if (!record.is_found)
533 return UDS_SUCCESS;
534
535 /* If the request was requeued, check whether the saved state is still valid. */
536
537 if (record.is_collision) {
538 set_chapter_location(request, zone, record.virtual_chapter);
539 } else {
540 /* Non-collision records are hints, so resolve the name in the chapter. */
541 bool found;
542
543 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
544 set_request_location(request, UDS_LOCATION_UNKNOWN);
545
546 request->virtual_chapter = record.virtual_chapter;
547 result = get_record_from_zone(zone, request, &found);
548 if (result != UDS_SUCCESS)
549 return result;
550
551 if (!found) {
552 /* There is no record to remove. */
553 return UDS_SUCCESS;
554 }
555 }
556
557 set_chapter_location(request, zone, record.virtual_chapter);
558
559 /*
560 * Delete the volume index entry for the named record only. Note that a later search might
561 * later return stale advice if there is a colliding name in the same chapter, but it's a
562 * very rare case (1 in 2^21).
563 */
564 result = uds_remove_volume_index_record(&record);
565 if (result != UDS_SUCCESS)
566 return result;
567
568 /*
569 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
570 * trouble if the record is added again later.
571 */
572 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
573 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
574
575 return UDS_SUCCESS;
576}
577
578static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
579{
580 int result;
581 struct index_zone *zone = index->zones[request->zone_number];
582
583 if (!request->requeued) {
584 result = simulate_index_zone_barrier_message(zone, request);
585 if (result != UDS_SUCCESS)
586 return result;
587 }
588
589 switch (request->type) {
590 case UDS_POST:
591 case UDS_UPDATE:
592 case UDS_QUERY:
593 case UDS_QUERY_NO_UPDATE:
594 result = search_index_zone(zone, request);
595 break;
596
597 case UDS_DELETE:
598 result = remove_from_index_zone(zone, request);
599 break;
600
601 default:
602 result = uds_log_warning_strerror(UDS_INVALID_ARGUMENT,
603 "invalid request type: %d",
604 request->type);
605 break;
606 }
607
608 return result;
609}
610
611/* This is the request processing function invoked by each zone's thread. */
612static void execute_zone_request(struct uds_request *request)
613{
614 int result;
615 struct uds_index *index = request->index;
616
617 if (request->zone_message.type != UDS_MESSAGE_NONE) {
618 result = dispatch_index_zone_control_request(request);
619 if (result != UDS_SUCCESS) {
620 uds_log_error_strerror(result, "error executing message: %d",
621 request->zone_message.type);
622 }
623
624 /* Once the message is processed it can be freed. */
625 uds_free(uds_forget(request));
626 return;
627 }
628
629 index->need_to_save = true;
630 if (request->requeued && (request->status != UDS_SUCCESS)) {
631 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
632 index->callback(request);
633 return;
634 }
635
636 result = dispatch_index_request(index, request);
637 if (result == UDS_QUEUED) {
638 /* The request has been requeued so don't let it complete. */
639 return;
640 }
641
642 if (!request->found)
643 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
644
645 request->status = result;
646 index->callback(request);
647}
648
649static int initialize_index_queues(struct uds_index *index,
650 const struct geometry *geometry)
651{
652 int result;
653 unsigned int i;
654
655 for (i = 0; i < index->zone_count; i++) {
656 result = uds_make_request_queue("indexW", &execute_zone_request,
657 &index->zone_queues[i]);
658 if (result != UDS_SUCCESS)
659 return result;
660 }
661
662 /* The triage queue is only needed for sparse multi-zone indexes. */
663 if ((index->zone_count > 1) && uds_is_sparse_geometry(geometry)) {
664 result = uds_make_request_queue("triageW", &triage_request,
665 &index->triage_queue);
666 if (result != UDS_SUCCESS)
667 return result;
668 }
669
670 return UDS_SUCCESS;
671}
672
673/* This is the driver function for the chapter writer thread. */
674static void close_chapters(void *arg)
675{
676 int result;
677 struct chapter_writer *writer = arg;
678 struct uds_index *index = writer->index;
679
680 uds_log_debug("chapter writer starting");
681 uds_lock_mutex(&writer->mutex);
682 for (;;) {
683 while (writer->zones_to_write < index->zone_count) {
684 if (writer->stop && (writer->zones_to_write == 0)) {
685 /*
686 * We've been told to stop, and all of the zones are in the same
687 * open chapter, so we can exit now.
688 */
689 uds_unlock_mutex(&writer->mutex);
690 uds_log_debug("chapter writer stopping");
691 return;
692 }
693 uds_wait_cond(&writer->cond, &writer->mutex);
694 }
695
696 /*
697 * Release the lock while closing a chapter. We probably don't need to do this, but
698 * it seems safer in principle. It's OK to access the chapter and chapter_number
699 * fields without the lock since those aren't allowed to change until we're done.
700 */
701 uds_unlock_mutex(&writer->mutex);
702
703 if (index->has_saved_open_chapter) {
704 /*
705 * Remove the saved open chapter the first time we close an open chapter
706 * after loading from a clean shutdown, or after doing a clean save. The
707 * lack of the saved open chapter will indicate that a recovery is
708 * necessary.
709 */
710 index->has_saved_open_chapter = false;
711 result = uds_discard_open_chapter(index->layout);
712 if (result == UDS_SUCCESS)
713 uds_log_debug("Discarding saved open chapter");
714 }
715
716 result = uds_close_open_chapter(writer->chapters, index->zone_count,
717 index->volume,
718 writer->open_chapter_index,
719 writer->collated_records,
720 index->newest_virtual_chapter);
721
722 uds_lock_mutex(&writer->mutex);
723 index->newest_virtual_chapter++;
724 index->oldest_virtual_chapter +=
725 uds_chapters_to_expire(index->volume->geometry,
726 index->newest_virtual_chapter);
727 writer->result = result;
728 writer->zones_to_write = 0;
729 uds_broadcast_cond(&writer->cond);
730 }
731}
732
733static void stop_chapter_writer(struct chapter_writer *writer)
734{
735 struct thread *writer_thread = 0;
736
737 uds_lock_mutex(&writer->mutex);
738 if (writer->thread != 0) {
739 writer_thread = writer->thread;
740 writer->thread = 0;
741 writer->stop = true;
742 uds_broadcast_cond(&writer->cond);
743 }
744 uds_unlock_mutex(&writer->mutex);
745
746 if (writer_thread != 0)
747 uds_join_threads(writer_thread);
748}
749
750static void free_chapter_writer(struct chapter_writer *writer)
751{
752 if (writer == NULL)
753 return;
754
755 stop_chapter_writer(writer);
756 uds_destroy_mutex(&writer->mutex);
757 uds_destroy_cond(&writer->cond);
758 uds_free_open_chapter_index(writer->open_chapter_index);
759 uds_free(writer->collated_records);
760 uds_free(writer);
761}
762
763static int make_chapter_writer(struct uds_index *index,
764 struct chapter_writer **writer_ptr)
765{
766 int result;
767 struct chapter_writer *writer;
768 size_t collated_records_size =
769 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
770
771 result = uds_allocate_extended(struct chapter_writer, index->zone_count,
772 struct open_chapter_zone *, "Chapter Writer",
773 &writer);
774 if (result != UDS_SUCCESS)
775 return result;
776
777 writer->index = index;
778 result = uds_init_mutex(&writer->mutex);
779 if (result != UDS_SUCCESS) {
780 uds_free(writer);
781 return result;
782 }
783
784 result = uds_init_cond(&writer->cond);
785 if (result != UDS_SUCCESS) {
786 uds_destroy_mutex(&writer->mutex);
787 uds_free(writer);
788 return result;
789 }
790
791 result = uds_allocate_cache_aligned(collated_records_size, "collated records",
792 &writer->collated_records);
793 if (result != UDS_SUCCESS) {
794 free_chapter_writer(writer);
795 return result;
796 }
797
798 result = uds_make_open_chapter_index(&writer->open_chapter_index,
799 index->volume->geometry,
800 index->volume->nonce);
801 if (result != UDS_SUCCESS) {
802 free_chapter_writer(writer);
803 return result;
804 }
805
806 writer->memory_size = (sizeof(struct chapter_writer) +
807 index->zone_count * sizeof(struct open_chapter_zone *) +
808 collated_records_size +
809 writer->open_chapter_index->memory_size);
810
811 result = uds_create_thread(close_chapters, writer, "writer", &writer->thread);
812 if (result != UDS_SUCCESS) {
813 free_chapter_writer(writer);
814 return result;
815 }
816
817 *writer_ptr = writer;
818 return UDS_SUCCESS;
819}
820
821static int load_index(struct uds_index *index)
822{
823 int result;
824 u64 last_save_chapter;
825
826 result = uds_load_index_state(index->layout, index);
827 if (result != UDS_SUCCESS)
828 return UDS_INDEX_NOT_SAVED_CLEANLY;
829
830 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
831
832 uds_log_info("loaded index from chapter %llu through chapter %llu",
833 (unsigned long long) index->oldest_virtual_chapter,
834 (unsigned long long) last_save_chapter);
835
836 return UDS_SUCCESS;
837}
838
839static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
840{
841 int result;
842 struct delta_index_page *chapter_index_page;
843 struct geometry *geometry = index->volume->geometry;
844 u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
845 u32 expected_list_number = 0;
846 u32 index_page_number;
847 u32 lowest_delta_list;
848 u32 highest_delta_list;
849
850 for (index_page_number = 0;
851 index_page_number < geometry->index_pages_per_chapter;
852 index_page_number++) {
853 result = uds_get_volume_index_page(index->volume, chapter,
854 index_page_number,
855 &chapter_index_page);
856 if (result != UDS_SUCCESS) {
857 return uds_log_error_strerror(result,
858 "failed to read index page %u in chapter %u",
859 index_page_number, chapter);
860 }
861
862 lowest_delta_list = chapter_index_page->lowest_list_number;
863 highest_delta_list = chapter_index_page->highest_list_number;
864 if (lowest_delta_list != expected_list_number) {
865 return uds_log_error_strerror(UDS_CORRUPT_DATA,
866 "chapter %u index page %u is corrupt",
867 chapter, index_page_number);
868 }
869
870 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
871 index_page_number, highest_delta_list);
872 expected_list_number = highest_delta_list + 1;
873 }
874
875 return UDS_SUCCESS;
876}
877
878static int replay_record(struct uds_index *index, const struct uds_record_name *name,
879 u64 virtual_chapter, bool will_be_sparse_chapter)
880{
881 int result;
882 struct volume_index_record record;
883 bool update_record;
884
885 if (will_be_sparse_chapter &&
886 !uds_is_volume_index_sample(index->volume_index, name)) {
887 /*
888 * This entry will be in a sparse chapter after the rebuild completes, and it is
889 * not a sample, so just skip over it.
890 */
891 return UDS_SUCCESS;
892 }
893
894 result = uds_get_volume_index_record(index->volume_index, name, &record);
895 if (result != UDS_SUCCESS)
896 return result;
897
898 if (record.is_found) {
899 if (record.is_collision) {
900 if (record.virtual_chapter == virtual_chapter) {
901 /* The record is already correct. */
902 return UDS_SUCCESS;
903 }
904
905 update_record = true;
906 } else if (record.virtual_chapter == virtual_chapter) {
907 /*
908 * There is a volume index entry pointing to the current chapter, but we
909 * don't know if it is for the same name as the one we are currently
910 * working on or not. For now, we're just going to assume that it isn't.
911 * This will create one extra collision record if there was a deleted
912 * record in the current chapter.
913 */
914 update_record = false;
915 } else {
916 /*
917 * If we're rebuilding, we don't normally want to go to disk to see if the
918 * record exists, since we will likely have just read the record from disk
919 * (i.e. we know it's there). The exception to this is when we find an
920 * entry in the volume index that has a different chapter. In this case, we
921 * need to search that chapter to determine if the volume index entry was
922 * for the same record or a different one.
923 */
924 result = uds_search_volume_page_cache_for_rebuild(index->volume,
925 name,
926 record.virtual_chapter,
927 &update_record);
928 if (result != UDS_SUCCESS)
929 return result;
930 }
931 } else {
932 update_record = false;
933 }
934
935 if (update_record) {
936 /*
937 * Update the volume index to reference the new chapter for the block. If the
938 * record had been deleted or dropped from the chapter index, it will be back.
939 */
940 result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
941 } else {
942 /*
943 * Add a new entry to the volume index referencing the open chapter. This should be
944 * done regardless of whether we are a brand new record or a sparse record, i.e.
945 * one that doesn't exist in the index but does on disk, since for a sparse record,
946 * we would want to un-sparsify if it did exist.
947 */
948 result = uds_put_volume_index_record(&record, virtual_chapter);
949 }
950
951 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
952 /* The rebuilt index will lose these records. */
953 return UDS_SUCCESS;
954 }
955
956 return result;
957}
958
959static bool check_for_suspend(struct uds_index *index)
960{
961 bool closing;
962
963 if (index->load_context == NULL)
964 return false;
965
966 uds_lock_mutex(&index->load_context->mutex);
967 if (index->load_context->status != INDEX_SUSPENDING) {
968 uds_unlock_mutex(&index->load_context->mutex);
969 return false;
970 }
971
972 /* Notify that we are suspended and wait for the resume. */
973 index->load_context->status = INDEX_SUSPENDED;
974 uds_broadcast_cond(&index->load_context->cond);
975
976 while ((index->load_context->status != INDEX_OPENING) &&
977 (index->load_context->status != INDEX_FREEING))
978 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
979
980 closing = (index->load_context->status == INDEX_FREEING);
981 uds_unlock_mutex(&index->load_context->mutex);
982 return closing;
983}
984
985static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
986{
987 int result;
988 u32 i;
989 u32 j;
990 const struct geometry *geometry;
991 u32 physical_chapter;
992
993 if (check_for_suspend(index)) {
994 uds_log_info("Replay interrupted by index shutdown at chapter %llu",
995 (unsigned long long) virtual);
996 return -EBUSY;
997 }
998
999 geometry = index->volume->geometry;
1000 physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
1001 uds_prefetch_volume_chapter(index->volume, physical_chapter);
1002 uds_set_volume_index_open_chapter(index->volume_index, virtual);
1003
1004 result = rebuild_index_page_map(index, virtual);
1005 if (result != UDS_SUCCESS) {
1006 return uds_log_error_strerror(result,
1007 "could not rebuild index page map for chapter %u",
1008 physical_chapter);
1009 }
1010
1011 for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1012 u8 *record_page;
1013 u32 record_page_number;
1014
1015 record_page_number = geometry->index_pages_per_chapter + i;
1016 result = uds_get_volume_record_page(index->volume, physical_chapter,
1017 record_page_number, &record_page);
1018 if (result != UDS_SUCCESS) {
1019 return uds_log_error_strerror(result, "could not get page %d",
1020 record_page_number);
1021 }
1022
1023 for (j = 0; j < geometry->records_per_page; j++) {
1024 const u8 *name_bytes;
1025 struct uds_record_name name;
1026
1027 name_bytes = record_page + (j * BYTES_PER_RECORD);
1028 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1029 result = replay_record(index, &name, virtual, sparse);
1030 if (result != UDS_SUCCESS)
1031 return result;
1032 }
1033 }
1034
1035 return UDS_SUCCESS;
1036}
1037
1038static int replay_volume(struct uds_index *index)
1039{
1040 int result;
1041 u64 old_map_update;
1042 u64 new_map_update;
1043 u64 virtual;
1044 u64 from_virtual = index->oldest_virtual_chapter;
1045 u64 upto_virtual = index->newest_virtual_chapter;
1046 bool will_be_sparse;
1047
1048 uds_log_info("Replaying volume from chapter %llu through chapter %llu",
1049 (unsigned long long) from_virtual,
1050 (unsigned long long) upto_virtual);
1051
1052 /*
1053 * The index failed to load, so the volume index is empty. Add records to the volume index
1054 * in order, skipping non-hooks in chapters which will be sparse to save time.
1055 *
1056 * Go through each record page of each chapter and add the records back to the volume
1057 * index. This should not cause anything to be written to either the open chapter or the
1058 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1059 * would have already been purged from the volume index when the chapter was opened.
1060 *
1061 * Also, go through each index page for each chapter and rebuild the index page map.
1062 */
1063 old_map_update = index->volume->index_page_map->last_update;
1064 for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1065 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1066 from_virtual, upto_virtual,
1067 virtual);
1068 result = replay_chapter(index, virtual, will_be_sparse);
1069 if (result != UDS_SUCCESS)
1070 return result;
1071 }
1072
1073 /* Also reap the chapter being replaced by the open chapter. */
1074 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1075
1076 new_map_update = index->volume->index_page_map->last_update;
1077 if (new_map_update != old_map_update) {
1078 uds_log_info("replay changed index page map update from %llu to %llu",
1079 (unsigned long long) old_map_update,
1080 (unsigned long long) new_map_update);
1081 }
1082
1083 return UDS_SUCCESS;
1084}
1085
1086static int rebuild_index(struct uds_index *index)
1087{
1088 int result;
1089 u64 lowest;
1090 u64 highest;
1091 bool is_empty = false;
1092 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1093
1094 index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1095 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1096 &is_empty);
1097 if (result != UDS_SUCCESS) {
1098 return uds_log_fatal_strerror(result,
1099 "cannot rebuild index: unknown volume chapter boundaries");
1100 }
1101
1102 if (is_empty) {
1103 index->newest_virtual_chapter = 0;
1104 index->oldest_virtual_chapter = 0;
1105 index->volume->lookup_mode = LOOKUP_NORMAL;
1106 return UDS_SUCCESS;
1107 }
1108
1109 index->newest_virtual_chapter = highest + 1;
1110 index->oldest_virtual_chapter = lowest;
1111 if (index->newest_virtual_chapter ==
1112 (index->oldest_virtual_chapter + chapters_per_volume)) {
1113 /* Skip the chapter shadowed by the open chapter. */
1114 index->oldest_virtual_chapter++;
1115 }
1116
1117 result = replay_volume(index);
1118 if (result != UDS_SUCCESS)
1119 return result;
1120
1121 index->volume->lookup_mode = LOOKUP_NORMAL;
1122 return UDS_SUCCESS;
1123}
1124
1125static void free_index_zone(struct index_zone *zone)
1126{
1127 if (zone == NULL)
1128 return;
1129
1130 uds_free_open_chapter(zone->open_chapter);
1131 uds_free_open_chapter(zone->writing_chapter);
1132 uds_free(zone);
1133}
1134
1135static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1136{
1137 int result;
1138 struct index_zone *zone;
1139
1140 result = uds_allocate(1, struct index_zone, "index zone", &zone);
1141 if (result != UDS_SUCCESS)
1142 return result;
1143
1144 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1145 &zone->open_chapter);
1146 if (result != UDS_SUCCESS) {
1147 free_index_zone(zone);
1148 return result;
1149 }
1150
1151 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1152 &zone->writing_chapter);
1153 if (result != UDS_SUCCESS) {
1154 free_index_zone(zone);
1155 return result;
1156 }
1157
1158 zone->index = index;
1159 zone->id = zone_number;
1160 index->zones[zone_number] = zone;
1161
1162 return UDS_SUCCESS;
1163}
1164
1165int uds_make_index(struct configuration *config, enum uds_open_index_type open_type,
1166 struct index_load_context *load_context, index_callback_fn callback,
1167 struct uds_index **new_index)
1168{
1169 int result;
1170 bool loaded = false;
1171 bool new = (open_type == UDS_CREATE);
1172 struct uds_index *index = NULL;
1173 struct index_zone *zone;
1174 u64 nonce;
1175 unsigned int z;
1176
1177 result = uds_allocate_extended(struct uds_index, config->zone_count,
1178 struct uds_request_queue *, "index", &index);
1179 if (result != UDS_SUCCESS)
1180 return result;
1181
1182 index->zone_count = config->zone_count;
1183
1184 result = uds_make_index_layout(config, new, &index->layout);
1185 if (result != UDS_SUCCESS) {
1186 uds_free_index(index);
1187 return result;
1188 }
1189
1190 result = uds_allocate(index->zone_count, struct index_zone *, "zones",
1191 &index->zones);
1192 if (result != UDS_SUCCESS) {
1193 uds_free_index(index);
1194 return result;
1195 }
1196
1197 result = uds_make_volume(config, index->layout, &index->volume);
1198 if (result != UDS_SUCCESS) {
1199 uds_free_index(index);
1200 return result;
1201 }
1202
1203 index->volume->lookup_mode = LOOKUP_NORMAL;
1204 for (z = 0; z < index->zone_count; z++) {
1205 result = make_index_zone(index, z);
1206 if (result != UDS_SUCCESS) {
1207 uds_free_index(index);
1208 return uds_log_error_strerror(result,
1209 "Could not create index zone");
1210 }
1211 }
1212
1213 nonce = uds_get_volume_nonce(index->layout);
1214 result = uds_make_volume_index(config, nonce, &index->volume_index);
1215 if (result != UDS_SUCCESS) {
1216 uds_free_index(index);
1217 return uds_log_error_strerror(result, "could not make volume index");
1218 }
1219
1220 index->load_context = load_context;
1221 index->callback = callback;
1222
1223 result = initialize_index_queues(index, config->geometry);
1224 if (result != UDS_SUCCESS) {
1225 uds_free_index(index);
1226 return result;
1227 }
1228
1229 result = make_chapter_writer(index, &index->chapter_writer);
1230 if (result != UDS_SUCCESS) {
1231 uds_free_index(index);
1232 return result;
1233 }
1234
1235 if (!new) {
1236 result = load_index(index);
1237 switch (result) {
1238 case UDS_SUCCESS:
1239 loaded = true;
1240 break;
1241 case -ENOMEM:
1242 /* We should not try a rebuild for this error. */
1243 uds_log_error_strerror(result, "index could not be loaded");
1244 break;
1245 default:
1246 uds_log_error_strerror(result, "index could not be loaded");
1247 if (open_type == UDS_LOAD) {
1248 result = rebuild_index(index);
1249 if (result != UDS_SUCCESS) {
1250 uds_log_error_strerror(result,
1251 "index could not be rebuilt");
1252 }
1253 }
1254 break;
1255 }
1256 }
1257
1258 if (result != UDS_SUCCESS) {
1259 uds_free_index(index);
1260 return uds_log_error_strerror(result, "fatal error in %s()", __func__);
1261 }
1262
1263 for (z = 0; z < index->zone_count; z++) {
1264 zone = index->zones[z];
1265 zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1266 zone->newest_virtual_chapter = index->newest_virtual_chapter;
1267 }
1268
1269 if (index->load_context != NULL) {
1270 uds_lock_mutex(&index->load_context->mutex);
1271 index->load_context->status = INDEX_READY;
1272 /*
1273 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1274 * us so it doesn't hang.
1275 */
1276 uds_broadcast_cond(&index->load_context->cond);
1277 uds_unlock_mutex(&index->load_context->mutex);
1278 }
1279
1280 index->has_saved_open_chapter = loaded;
1281 index->need_to_save = !loaded;
1282 *new_index = index;
1283 return UDS_SUCCESS;
1284}
1285
1286void uds_free_index(struct uds_index *index)
1287{
1288 unsigned int i;
1289
1290 if (index == NULL)
1291 return;
1292
1293 uds_request_queue_finish(index->triage_queue);
1294 for (i = 0; i < index->zone_count; i++)
1295 uds_request_queue_finish(index->zone_queues[i]);
1296
1297 free_chapter_writer(index->chapter_writer);
1298
1299 uds_free_volume_index(index->volume_index);
1300 if (index->zones != NULL) {
1301 for (i = 0; i < index->zone_count; i++)
1302 free_index_zone(index->zones[i]);
1303 uds_free(index->zones);
1304 }
1305
1306 uds_free_volume(index->volume);
1307 uds_free_index_layout(uds_forget(index->layout));
1308 uds_free(index);
1309}
1310
1311/* Wait for the chapter writer to complete any outstanding writes. */
1312void uds_wait_for_idle_index(struct uds_index *index)
1313{
1314 struct chapter_writer *writer = index->chapter_writer;
1315
1316 uds_lock_mutex(&writer->mutex);
1317 while (writer->zones_to_write > 0)
1318 uds_wait_cond(&writer->cond, &writer->mutex);
1319 uds_unlock_mutex(&writer->mutex);
1320}
1321
1322/* This function assumes that all requests have been drained. */
1323int uds_save_index(struct uds_index *index)
1324{
1325 int result;
1326
1327 if (!index->need_to_save)
1328 return UDS_SUCCESS;
1329
1330 uds_wait_for_idle_index(index);
1331 index->prev_save = index->last_save;
1332 index->last_save = ((index->newest_virtual_chapter == 0) ?
5c45cd10 1333 NO_LAST_SAVE : index->newest_virtual_chapter - 1);
766130e8
MS
1334 uds_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1335
1336 result = uds_save_index_state(index->layout, index);
1337 if (result != UDS_SUCCESS) {
1338 uds_log_info("save index failed");
1339 index->last_save = index->prev_save;
1340 } else {
1341 index->has_saved_open_chapter = true;
1342 index->need_to_save = false;
1343 uds_log_info("finished save (vcn %llu)",
1344 (unsigned long long) index->last_save);
1345 }
1346
1347 return result;
1348}
1349
1350int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1351{
1352 return uds_replace_volume_storage(index->volume, index->layout, bdev);
1353}
1354
1355/* Accessing statistics should be safe from any thread. */
1356void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1357{
1358 struct volume_index_stats stats;
1359
1360 uds_get_volume_index_stats(index->volume_index, &stats);
1361 counters->entries_indexed = stats.record_count;
1362 counters->collisions = stats.collision_count;
1363 counters->entries_discarded = stats.discard_count;
1364
1365 counters->memory_used = (index->volume_index->memory_size +
1366 index->volume->cache_size +
1367 index->chapter_writer->memory_size);
1368}
1369
1370void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1371{
1372 struct uds_index *index = request->index;
1373 struct uds_request_queue *queue;
1374
1375 switch (stage) {
1376 case STAGE_TRIAGE:
1377 if (index->triage_queue != NULL) {
1378 queue = index->triage_queue;
1379 break;
1380 }
1381
1382 fallthrough;
1383
1384 case STAGE_INDEX:
1385 request->zone_number =
1386 uds_get_volume_index_zone(index->volume_index, &request->record_name);
1387 fallthrough;
1388
1389 case STAGE_MESSAGE:
1390 queue = index->zone_queues[request->zone_number];
1391 break;
1392
1393 default:
1394 ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1395 return;
1396 }
1397
1398 uds_request_queue_enqueue(queue, request);
1399}