28ae54efacede431873a522db99063d54d8f25f9
[linux-2.6-block.git] / drivers / md / dm-vdo / index.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6
7 #include "index.h"
8
9 #include "funnel-requestqueue.h"
10 #include "hash-utils.h"
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "sparse-cache.h"
14
15 static const u64 NO_LAST_SAVE = U64_MAX;
16
17 /*
18  * When searching for deduplication records, the index first searches the volume index, and then
19  * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
20  * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
21  * committed (either the open chapter or a recently closed one), the index searches the in-memory
22  * representation of the chapter. Finally, if the volume index does not find a record and the index
23  * is sparse, the index will search the sparse cache.
24  *
25  * The index send two kinds of messages to coordinate between zones: chapter close messages for the
26  * chapter writer, and sparse cache barrier messages for the sparse cache.
27  *
28  * The chapter writer is responsible for committing chapters of records to storage. Since zones can
29  * get different numbers of records, some zones may fall behind others. Each time a zone fills up
30  * its available space in a chapter, it informs the chapter writer that the chapter is complete,
31  * and also informs all other zones that it has closed the chapter. Each other zone will then close
32  * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
33  * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
34  *
35  * The last zone to close the chapter also removes the oldest chapter from the volume index.
36  * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
37  * means that those zones will never ask the volume index about it. No zone is allowed to get more
38  * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
39  * chapter before the previous one has been closed by all zones, it is forced to wait.
40  *
41  * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
42  * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
43  * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
44  * paused its operations, the cache membership is changed and each zone is then informed that it
45  * can proceed. More details can be found in the sparse cache documentation.
46  *
47  * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
48  * barrier message to change the sparse cache membership, so the index simulates the message by
49  * invoking the handler directly.
50  */
51
52 struct chapter_writer {
53         /* The index to which we belong */
54         struct uds_index *index;
55         /* The thread to do the writing */
56         struct thread *thread;
57         /* The lock protecting the following fields */
58         struct mutex mutex;
59         /* The condition signalled on state changes */
60         struct cond_var cond;
61         /* Set to true to stop the thread */
62         bool stop;
63         /* The result from the most recent write */
64         int result;
65         /* The number of bytes allocated by the chapter writer */
66         size_t memory_size;
67         /* The number of zones which have submitted a chapter for writing */
68         unsigned int zones_to_write;
69         /* Open chapter index used by uds_close_open_chapter() */
70         struct open_chapter_index *open_chapter_index;
71         /* Collated records used by uds_close_open_chapter() */
72         struct uds_volume_record *collated_records;
73         /* The chapters to write (one per zone) */
74         struct open_chapter_zone *chapters[];
75 };
76
77 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
78 {
79         return uds_is_chapter_sparse(zone->index->volume->geometry,
80                                      zone->oldest_virtual_chapter,
81                                      zone->newest_virtual_chapter, virtual_chapter);
82 }
83
84 static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
85                                struct uds_index *index)
86 {
87         int result;
88         struct uds_request *request;
89
90         result = uds_allocate(1, struct uds_request, __func__, &request);
91         if (result != UDS_SUCCESS)
92                 return result;
93
94         request->index = index;
95         request->unbatched = true;
96         request->zone_number = zone;
97         request->zone_message = message;
98
99         uds_enqueue_request(request, STAGE_MESSAGE);
100         return UDS_SUCCESS;
101 }
102
103 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
104 {
105         struct uds_zone_message message = {
106                 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
107                 .virtual_chapter = virtual_chapter,
108         };
109         unsigned int zone;
110
111         for (zone = 0; zone < index->zone_count; zone++) {
112                 int result = launch_zone_message(message, zone, index);
113
114                 ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
115         }
116 }
117
118 /*
119  * Determine whether this request should trigger a sparse cache barrier message to change the
120  * membership of the sparse cache. If a change in membership is desired, the function returns the
121  * chapter number to add.
122  */
123 static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
124 {
125         u64 virtual_chapter;
126         struct index_zone *zone;
127
128         virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
129                                                        &request->record_name);
130         if (virtual_chapter == NO_CHAPTER)
131                 return NO_CHAPTER;
132
133         zone = index->zones[request->zone_number];
134         if (!is_zone_chapter_sparse(zone, virtual_chapter))
135                 return NO_CHAPTER;
136
137         /*
138          * FIXME: Optimize for a common case by remembering the chapter from the most recent
139          * barrier message and skipping this chapter if is it the same.
140          */
141
142         return virtual_chapter;
143 }
144
145 /*
146  * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
147  * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
148  * of index does nothing here.
149  */
150 static int simulate_index_zone_barrier_message(struct index_zone *zone,
151                                                struct uds_request *request)
152 {
153         u64 sparse_virtual_chapter;
154
155         if ((zone->index->zone_count > 1) ||
156             !uds_is_sparse_geometry(zone->index->volume->geometry))
157                 return UDS_SUCCESS;
158
159         sparse_virtual_chapter = triage_index_request(zone->index, request);
160         if (sparse_virtual_chapter == NO_CHAPTER)
161                 return UDS_SUCCESS;
162
163         return uds_update_sparse_cache(zone, sparse_virtual_chapter);
164 }
165
166 /* This is the request processing function for the triage queue. */
167 static void triage_request(struct uds_request *request)
168 {
169         struct uds_index *index = request->index;
170         u64 sparse_virtual_chapter = triage_index_request(index, request);
171
172         if (sparse_virtual_chapter != NO_CHAPTER)
173                 enqueue_barrier_messages(index, sparse_virtual_chapter);
174
175         uds_enqueue_request(request, STAGE_INDEX);
176 }
177
178 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
179 {
180         int result;
181         struct chapter_writer *writer = index->chapter_writer;
182
183         uds_lock_mutex(&writer->mutex);
184         while (index->newest_virtual_chapter < current_chapter_number)
185                 uds_wait_cond(&writer->cond, &writer->mutex);
186         result = writer->result;
187         uds_unlock_mutex(&writer->mutex);
188
189         if (result != UDS_SUCCESS)
190                 return uds_log_error_strerror(result,
191                                               "Writing of previous open chapter failed");
192
193         return UDS_SUCCESS;
194 }
195
196 static int swap_open_chapter(struct index_zone *zone)
197 {
198         int result;
199         struct open_chapter_zone *temporary_chapter;
200
201         result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
202         if (result != UDS_SUCCESS)
203                 return result;
204
205         temporary_chapter = zone->open_chapter;
206         zone->open_chapter = zone->writing_chapter;
207         zone->writing_chapter = temporary_chapter;
208         return UDS_SUCCESS;
209 }
210
211 /*
212  * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
213  * writing until all zones have closed it.
214  */
215 static unsigned int start_closing_chapter(struct uds_index *index,
216                                           unsigned int zone_number,
217                                           struct open_chapter_zone *chapter)
218 {
219         unsigned int finished_zones;
220         struct chapter_writer *writer = index->chapter_writer;
221
222         uds_lock_mutex(&writer->mutex);
223         finished_zones = ++writer->zones_to_write;
224         writer->chapters[zone_number] = chapter;
225         uds_broadcast_cond(&writer->cond);
226         uds_unlock_mutex(&writer->mutex);
227
228         return finished_zones;
229 }
230
231 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
232 {
233         int result;
234         unsigned int i;
235         struct uds_zone_message zone_message = {
236                 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
237                 .virtual_chapter = closed_chapter,
238         };
239
240         for (i = 0; i < zone->index->zone_count; i++) {
241                 if (zone->id == i)
242                         continue;
243
244                 result = launch_zone_message(zone_message, i, zone->index);
245                 if (result != UDS_SUCCESS)
246                         return result;
247         }
248
249         return UDS_SUCCESS;
250 }
251
252 static int open_next_chapter(struct index_zone *zone)
253 {
254         int result;
255         u64 closed_chapter;
256         u64 expiring;
257         unsigned int finished_zones;
258         u32 expire_chapters;
259
260         uds_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
261                       (unsigned long long) zone->newest_virtual_chapter, zone->id,
262                       zone->open_chapter->size,
263                       zone->open_chapter->capacity - zone->open_chapter->size);
264
265         result = swap_open_chapter(zone);
266         if (result != UDS_SUCCESS)
267                 return result;
268
269         closed_chapter = zone->newest_virtual_chapter++;
270         uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
271                                                zone->newest_virtual_chapter);
272         uds_reset_open_chapter(zone->open_chapter);
273
274         finished_zones = start_closing_chapter(zone->index, zone->id,
275                                                zone->writing_chapter);
276         if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
277                 result = announce_chapter_closed(zone, closed_chapter);
278                 if (result != UDS_SUCCESS)
279                         return result;
280         }
281
282         expiring = zone->oldest_virtual_chapter;
283         expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
284                                                  zone->newest_virtual_chapter);
285         zone->oldest_virtual_chapter += expire_chapters;
286
287         if (finished_zones < zone->index->zone_count)
288                 return UDS_SUCCESS;
289
290         while (expire_chapters-- > 0)
291                 uds_forget_chapter(zone->index->volume, expiring++);
292
293         return UDS_SUCCESS;
294 }
295
296 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
297 {
298         if (zone->newest_virtual_chapter == virtual_chapter)
299                 return open_next_chapter(zone);
300
301         return UDS_SUCCESS;
302 }
303
304 static int dispatch_index_zone_control_request(struct uds_request *request)
305 {
306         struct uds_zone_message *message = &request->zone_message;
307         struct index_zone *zone = request->index->zones[request->zone_number];
308
309         switch (message->type) {
310         case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
311                 return uds_update_sparse_cache(zone, message->virtual_chapter);
312
313         case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
314                 return handle_chapter_closed(zone, message->virtual_chapter);
315
316         default:
317                 uds_log_error("invalid message type: %d", message->type);
318                 return UDS_INVALID_ARGUMENT;
319         }
320 }
321
322 static void set_request_location(struct uds_request *request,
323                                  enum uds_index_region new_location)
324 {
325         request->location = new_location;
326         request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
327                           (new_location == UDS_LOCATION_IN_DENSE) ||
328                           (new_location == UDS_LOCATION_IN_SPARSE));
329 }
330
331 static void set_chapter_location(struct uds_request *request,
332                                  const struct index_zone *zone, u64 virtual_chapter)
333 {
334         request->found = true;
335         if (virtual_chapter == zone->newest_virtual_chapter)
336                 request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
337         else if (is_zone_chapter_sparse(zone, virtual_chapter))
338                 request->location = UDS_LOCATION_IN_SPARSE;
339         else
340                 request->location = UDS_LOCATION_IN_DENSE;
341 }
342
343 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
344                                        u64 virtual_chapter, bool *found)
345 {
346         int result;
347         struct volume *volume;
348         u16 record_page_number;
349         u32 chapter;
350
351         result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
352                                          &record_page_number);
353         if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
354                 return result;
355
356         request->virtual_chapter = virtual_chapter;
357         volume = zone->index->volume;
358         chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
359         return uds_search_cached_record_page(volume, request, chapter,
360                                              record_page_number, found);
361 }
362
363 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
364                                 bool *found)
365 {
366         struct volume *volume;
367
368         if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
369                 *found = true;
370                 return UDS_SUCCESS;
371         } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
372                 *found = false;
373                 return UDS_SUCCESS;
374         }
375
376         if (request->virtual_chapter == zone->newest_virtual_chapter) {
377                 uds_search_open_chapter(zone->open_chapter, &request->record_name,
378                                         &request->old_metadata, found);
379                 return UDS_SUCCESS;
380         }
381
382         if ((zone->newest_virtual_chapter > 0) &&
383             (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
384             (zone->writing_chapter->size > 0)) {
385                 uds_search_open_chapter(zone->writing_chapter, &request->record_name,
386                                         &request->old_metadata, found);
387                 return UDS_SUCCESS;
388         }
389
390         volume = zone->index->volume;
391         if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
392             uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
393                                       request->zone_number))
394                 return search_sparse_cache_in_zone(zone, request,
395                                                    request->virtual_chapter, found);
396
397         return uds_search_volume_page_cache(volume, request, found);
398 }
399
400 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
401                               const struct uds_record_data *metadata)
402 {
403         unsigned int remaining;
404
405         remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
406                                          metadata);
407         if (remaining == 0)
408                 return open_next_chapter(zone);
409
410         return UDS_SUCCESS;
411 }
412
413 static int search_index_zone(struct index_zone *zone, struct uds_request *request)
414 {
415         int result;
416         struct volume_index_record record;
417         bool overflow_record, found = false;
418         struct uds_record_data *metadata;
419         u64 chapter;
420
421         result = uds_get_volume_index_record(zone->index->volume_index,
422                                              &request->record_name, &record);
423         if (result != UDS_SUCCESS)
424                 return result;
425
426         if (record.is_found) {
427                 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
428                         set_request_location(request, UDS_LOCATION_UNKNOWN);
429
430                 request->virtual_chapter = record.virtual_chapter;
431                 result = get_record_from_zone(zone, request, &found);
432                 if (result != UDS_SUCCESS)
433                         return result;
434         }
435
436         if (found)
437                 set_chapter_location(request, zone, record.virtual_chapter);
438
439         /*
440          * If a record has overflowed a chapter index in more than one chapter (or overflowed in
441          * one chapter and collided with an existing record), it will exist as a collision record
442          * in the volume index, but we won't find it in the volume. This case needs special
443          * handling.
444          */
445         overflow_record = (record.is_found && record.is_collision && !found);
446         chapter = zone->newest_virtual_chapter;
447         if (found || overflow_record) {
448                 if ((request->type == UDS_QUERY_NO_UPDATE) ||
449                     ((request->type == UDS_QUERY) && overflow_record)) {
450                         /* There is nothing left to do. */
451                         return UDS_SUCCESS;
452                 }
453
454                 if (record.virtual_chapter != chapter) {
455                         /*
456                          * Update the volume index to reference the new chapter for the block. If
457                          * the record had been deleted or dropped from the chapter index, it will
458                          * be back.
459                          */
460                         result = uds_set_volume_index_record_chapter(&record, chapter);
461                 } else if (request->type != UDS_UPDATE) {
462                         /* The record is already in the open chapter. */
463                         return UDS_SUCCESS;
464                 }
465         } else {
466                 /*
467                  * The record wasn't in the volume index, so check whether the
468                  * name is in a cached sparse chapter. If we found the name on
469                  * a previous search, use that result instead.
470                  */
471                 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
472                         found = true;
473                 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
474                         found = false;
475                 } else if (uds_is_sparse_geometry(zone->index->volume->geometry) &&
476                            !uds_is_volume_index_sample(zone->index->volume_index,
477                                                        &request->record_name)) {
478                         result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
479                                                              &found);
480                         if (result != UDS_SUCCESS)
481                                 return result;
482                 }
483
484                 if (found)
485                         set_request_location(request, UDS_LOCATION_IN_SPARSE);
486
487                 if ((request->type == UDS_QUERY_NO_UPDATE) ||
488                     ((request->type == UDS_QUERY) && !found)) {
489                         /* There is nothing left to do. */
490                         return UDS_SUCCESS;
491                 }
492
493                 /*
494                  * Add a new entry to the volume index referencing the open chapter. This needs to
495                  * be done both for new records, and for records from cached sparse chapters.
496                  */
497                 result = uds_put_volume_index_record(&record, chapter);
498         }
499
500         if (result == UDS_OVERFLOW) {
501                 /*
502                  * The volume index encountered a delta list overflow. The condition was already
503                  * logged. We will go on without adding the record to the open chapter.
504                  */
505                 return UDS_SUCCESS;
506         }
507
508         if (result != UDS_SUCCESS)
509                 return result;
510
511         if (!found || (request->type == UDS_UPDATE)) {
512                 /* This is a new record or we're updating an existing record. */
513                 metadata = &request->new_metadata;
514         } else {
515                 /* Move the existing record to the open chapter. */
516                 metadata = &request->old_metadata;
517         }
518
519         return put_record_in_zone(zone, request, metadata);
520 }
521
522 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
523 {
524         int result;
525         struct volume_index_record record;
526
527         result = uds_get_volume_index_record(zone->index->volume_index,
528                                              &request->record_name, &record);
529         if (result != UDS_SUCCESS)
530                 return result;
531
532         if (!record.is_found)
533                 return UDS_SUCCESS;
534
535         /* If the request was requeued, check whether the saved state is still valid. */
536
537         if (record.is_collision) {
538                 set_chapter_location(request, zone, record.virtual_chapter);
539         } else {
540                 /* Non-collision records are hints, so resolve the name in the chapter. */
541                 bool found;
542
543                 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
544                         set_request_location(request, UDS_LOCATION_UNKNOWN);
545
546                 request->virtual_chapter = record.virtual_chapter;
547                 result = get_record_from_zone(zone, request, &found);
548                 if (result != UDS_SUCCESS)
549                         return result;
550
551                 if (!found) {
552                         /* There is no record to remove. */
553                         return UDS_SUCCESS;
554                 }
555         }
556
557         set_chapter_location(request, zone, record.virtual_chapter);
558
559         /*
560          * Delete the volume index entry for the named record only. Note that a later search might
561          * later return stale advice if there is a colliding name in the same chapter, but it's a
562          * very rare case (1 in 2^21).
563          */
564         result = uds_remove_volume_index_record(&record);
565         if (result != UDS_SUCCESS)
566                 return result;
567
568         /*
569          * If the record is in the open chapter, we must remove it or mark it deleted to avoid
570          * trouble if the record is added again later.
571          */
572         if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
573                 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
574
575         return UDS_SUCCESS;
576 }
577
578 static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
579 {
580         int result;
581         struct index_zone *zone = index->zones[request->zone_number];
582
583         if (!request->requeued) {
584                 result = simulate_index_zone_barrier_message(zone, request);
585                 if (result != UDS_SUCCESS)
586                         return result;
587         }
588
589         switch (request->type) {
590         case UDS_POST:
591         case UDS_UPDATE:
592         case UDS_QUERY:
593         case UDS_QUERY_NO_UPDATE:
594                 result = search_index_zone(zone, request);
595                 break;
596
597         case UDS_DELETE:
598                 result = remove_from_index_zone(zone, request);
599                 break;
600
601         default:
602                 result = uds_log_warning_strerror(UDS_INVALID_ARGUMENT,
603                                                   "invalid request type: %d",
604                                                   request->type);
605                 break;
606         }
607
608         return result;
609 }
610
611 /* This is the request processing function invoked by each zone's thread. */
612 static void execute_zone_request(struct uds_request *request)
613 {
614         int result;
615         struct uds_index *index = request->index;
616
617         if (request->zone_message.type != UDS_MESSAGE_NONE) {
618                 result = dispatch_index_zone_control_request(request);
619                 if (result != UDS_SUCCESS) {
620                         uds_log_error_strerror(result, "error executing message: %d",
621                                                request->zone_message.type);
622                 }
623
624                 /* Once the message is processed it can be freed. */
625                 uds_free(uds_forget(request));
626                 return;
627         }
628
629         index->need_to_save = true;
630         if (request->requeued && (request->status != UDS_SUCCESS)) {
631                 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
632                 index->callback(request);
633                 return;
634         }
635
636         result = dispatch_index_request(index, request);
637         if (result == UDS_QUEUED) {
638                 /* The request has been requeued so don't let it complete. */
639                 return;
640         }
641
642         if (!request->found)
643                 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
644
645         request->status = result;
646         index->callback(request);
647 }
648
649 static int initialize_index_queues(struct uds_index *index,
650                                    const struct geometry *geometry)
651 {
652         int result;
653         unsigned int i;
654
655         for (i = 0; i < index->zone_count; i++) {
656                 result = uds_make_request_queue("indexW", &execute_zone_request,
657                                                 &index->zone_queues[i]);
658                 if (result != UDS_SUCCESS)
659                         return result;
660         }
661
662         /* The triage queue is only needed for sparse multi-zone indexes. */
663         if ((index->zone_count > 1) && uds_is_sparse_geometry(geometry)) {
664                 result = uds_make_request_queue("triageW", &triage_request,
665                                                 &index->triage_queue);
666                 if (result != UDS_SUCCESS)
667                         return result;
668         }
669
670         return UDS_SUCCESS;
671 }
672
673 /* This is the driver function for the chapter writer thread. */
674 static void close_chapters(void *arg)
675 {
676         int result;
677         struct chapter_writer *writer = arg;
678         struct uds_index *index = writer->index;
679
680         uds_log_debug("chapter writer starting");
681         uds_lock_mutex(&writer->mutex);
682         for (;;) {
683                 while (writer->zones_to_write < index->zone_count) {
684                         if (writer->stop && (writer->zones_to_write == 0)) {
685                                 /*
686                                  * We've been told to stop, and all of the zones are in the same
687                                  * open chapter, so we can exit now.
688                                  */
689                                 uds_unlock_mutex(&writer->mutex);
690                                 uds_log_debug("chapter writer stopping");
691                                 return;
692                         }
693                         uds_wait_cond(&writer->cond, &writer->mutex);
694                 }
695
696                 /*
697                  * Release the lock while closing a chapter. We probably don't need to do this, but
698                  * it seems safer in principle. It's OK to access the chapter and chapter_number
699                  * fields without the lock since those aren't allowed to change until we're done.
700                  */
701                 uds_unlock_mutex(&writer->mutex);
702
703                 if (index->has_saved_open_chapter) {
704                         /*
705                          * Remove the saved open chapter the first time we close an open chapter
706                          * after loading from a clean shutdown, or after doing a clean save. The
707                          * lack of the saved open chapter will indicate that a recovery is
708                          * necessary.
709                          */
710                         index->has_saved_open_chapter = false;
711                         result = uds_discard_open_chapter(index->layout);
712                         if (result == UDS_SUCCESS)
713                                 uds_log_debug("Discarding saved open chapter");
714                 }
715
716                 result = uds_close_open_chapter(writer->chapters, index->zone_count,
717                                                 index->volume,
718                                                 writer->open_chapter_index,
719                                                 writer->collated_records,
720                                                 index->newest_virtual_chapter);
721
722                 uds_lock_mutex(&writer->mutex);
723                 index->newest_virtual_chapter++;
724                 index->oldest_virtual_chapter +=
725                         uds_chapters_to_expire(index->volume->geometry,
726                                                index->newest_virtual_chapter);
727                 writer->result = result;
728                 writer->zones_to_write = 0;
729                 uds_broadcast_cond(&writer->cond);
730         }
731 }
732
733 static void stop_chapter_writer(struct chapter_writer *writer)
734 {
735         struct thread *writer_thread = 0;
736
737         uds_lock_mutex(&writer->mutex);
738         if (writer->thread != 0) {
739                 writer_thread = writer->thread;
740                 writer->thread = 0;
741                 writer->stop = true;
742                 uds_broadcast_cond(&writer->cond);
743         }
744         uds_unlock_mutex(&writer->mutex);
745
746         if (writer_thread != 0)
747                 uds_join_threads(writer_thread);
748 }
749
750 static void free_chapter_writer(struct chapter_writer *writer)
751 {
752         if (writer == NULL)
753                 return;
754
755         stop_chapter_writer(writer);
756         uds_destroy_mutex(&writer->mutex);
757         uds_destroy_cond(&writer->cond);
758         uds_free_open_chapter_index(writer->open_chapter_index);
759         uds_free(writer->collated_records);
760         uds_free(writer);
761 }
762
763 static int make_chapter_writer(struct uds_index *index,
764                                struct chapter_writer **writer_ptr)
765 {
766         int result;
767         struct chapter_writer *writer;
768         size_t collated_records_size =
769                 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
770
771         result = uds_allocate_extended(struct chapter_writer, index->zone_count,
772                                        struct open_chapter_zone *, "Chapter Writer",
773                                        &writer);
774         if (result != UDS_SUCCESS)
775                 return result;
776
777         writer->index = index;
778         result = uds_init_mutex(&writer->mutex);
779         if (result != UDS_SUCCESS) {
780                 uds_free(writer);
781                 return result;
782         }
783
784         result = uds_init_cond(&writer->cond);
785         if (result != UDS_SUCCESS) {
786                 uds_destroy_mutex(&writer->mutex);
787                 uds_free(writer);
788                 return result;
789         }
790
791         result = uds_allocate_cache_aligned(collated_records_size, "collated records",
792                                             &writer->collated_records);
793         if (result != UDS_SUCCESS) {
794                 free_chapter_writer(writer);
795                 return result;
796         }
797
798         result = uds_make_open_chapter_index(&writer->open_chapter_index,
799                                              index->volume->geometry,
800                                              index->volume->nonce);
801         if (result != UDS_SUCCESS) {
802                 free_chapter_writer(writer);
803                 return result;
804         }
805
806         writer->memory_size = (sizeof(struct chapter_writer) +
807                                index->zone_count * sizeof(struct open_chapter_zone *) +
808                                collated_records_size +
809                                writer->open_chapter_index->memory_size);
810
811         result = uds_create_thread(close_chapters, writer, "writer", &writer->thread);
812         if (result != UDS_SUCCESS) {
813                 free_chapter_writer(writer);
814                 return result;
815         }
816
817         *writer_ptr = writer;
818         return UDS_SUCCESS;
819 }
820
821 static int load_index(struct uds_index *index)
822 {
823         int result;
824         u64 last_save_chapter;
825
826         result = uds_load_index_state(index->layout, index);
827         if (result != UDS_SUCCESS)
828                 return UDS_INDEX_NOT_SAVED_CLEANLY;
829
830         last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
831
832         uds_log_info("loaded index from chapter %llu through chapter %llu",
833                      (unsigned long long) index->oldest_virtual_chapter,
834                      (unsigned long long) last_save_chapter);
835
836         return UDS_SUCCESS;
837 }
838
839 static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
840 {
841         int result;
842         struct delta_index_page *chapter_index_page;
843         struct geometry *geometry = index->volume->geometry;
844         u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
845         u32 expected_list_number = 0;
846         u32 index_page_number;
847         u32 lowest_delta_list;
848         u32 highest_delta_list;
849
850         for (index_page_number = 0;
851              index_page_number < geometry->index_pages_per_chapter;
852              index_page_number++) {
853                 result = uds_get_volume_index_page(index->volume, chapter,
854                                                    index_page_number,
855                                                    &chapter_index_page);
856                 if (result != UDS_SUCCESS) {
857                         return uds_log_error_strerror(result,
858                                                       "failed to read index page %u in chapter %u",
859                                                       index_page_number, chapter);
860                 }
861
862                 lowest_delta_list = chapter_index_page->lowest_list_number;
863                 highest_delta_list = chapter_index_page->highest_list_number;
864                 if (lowest_delta_list != expected_list_number) {
865                         return uds_log_error_strerror(UDS_CORRUPT_DATA,
866                                                       "chapter %u index page %u is corrupt",
867                                                       chapter, index_page_number);
868                 }
869
870                 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
871                                           index_page_number, highest_delta_list);
872                 expected_list_number = highest_delta_list + 1;
873         }
874
875         return UDS_SUCCESS;
876 }
877
878 static int replay_record(struct uds_index *index, const struct uds_record_name *name,
879                          u64 virtual_chapter, bool will_be_sparse_chapter)
880 {
881         int result;
882         struct volume_index_record record;
883         bool update_record;
884
885         if (will_be_sparse_chapter &&
886             !uds_is_volume_index_sample(index->volume_index, name)) {
887                 /*
888                  * This entry will be in a sparse chapter after the rebuild completes, and it is
889                  * not a sample, so just skip over it.
890                  */
891                 return UDS_SUCCESS;
892         }
893
894         result = uds_get_volume_index_record(index->volume_index, name, &record);
895         if (result != UDS_SUCCESS)
896                 return result;
897
898         if (record.is_found) {
899                 if (record.is_collision) {
900                         if (record.virtual_chapter == virtual_chapter) {
901                                 /* The record is already correct. */
902                                 return UDS_SUCCESS;
903                         }
904
905                         update_record = true;
906                 } else if (record.virtual_chapter == virtual_chapter) {
907                         /*
908                          * There is a volume index entry pointing to the current chapter, but we
909                          * don't know if it is for the same name as the one we are currently
910                          * working on or not. For now, we're just going to assume that it isn't.
911                          * This will create one extra collision record if there was a deleted
912                          * record in the current chapter.
913                          */
914                         update_record = false;
915                 } else {
916                         /*
917                          * If we're rebuilding, we don't normally want to go to disk to see if the
918                          * record exists, since we will likely have just read the record from disk
919                          * (i.e. we know it's there). The exception to this is when we find an
920                          * entry in the volume index that has a different chapter. In this case, we
921                          * need to search that chapter to determine if the volume index entry was
922                          * for the same record or a different one.
923                          */
924                         result = uds_search_volume_page_cache_for_rebuild(index->volume,
925                                                                           name,
926                                                                           record.virtual_chapter,
927                                                                           &update_record);
928                         if (result != UDS_SUCCESS)
929                                 return result;
930                         }
931         } else {
932                 update_record = false;
933         }
934
935         if (update_record) {
936                 /*
937                  * Update the volume index to reference the new chapter for the block. If the
938                  * record had been deleted or dropped from the chapter index, it will be back.
939                  */
940                 result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
941         } else {
942                 /*
943                  * Add a new entry to the volume index referencing the open chapter. This should be
944                  * done regardless of whether we are a brand new record or a sparse record, i.e.
945                  * one that doesn't exist in the index but does on disk, since for a sparse record,
946                  * we would want to un-sparsify if it did exist.
947                  */
948                 result = uds_put_volume_index_record(&record, virtual_chapter);
949         }
950
951         if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
952                 /* The rebuilt index will lose these records. */
953                 return UDS_SUCCESS;
954         }
955
956         return result;
957 }
958
959 static bool check_for_suspend(struct uds_index *index)
960 {
961         bool closing;
962
963         if (index->load_context == NULL)
964                 return false;
965
966         uds_lock_mutex(&index->load_context->mutex);
967         if (index->load_context->status != INDEX_SUSPENDING) {
968                 uds_unlock_mutex(&index->load_context->mutex);
969                 return false;
970         }
971
972         /* Notify that we are suspended and wait for the resume. */
973         index->load_context->status = INDEX_SUSPENDED;
974         uds_broadcast_cond(&index->load_context->cond);
975
976         while ((index->load_context->status != INDEX_OPENING) &&
977                (index->load_context->status != INDEX_FREEING))
978                 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
979
980         closing = (index->load_context->status == INDEX_FREEING);
981         uds_unlock_mutex(&index->load_context->mutex);
982         return closing;
983 }
984
985 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
986 {
987         int result;
988         u32 i;
989         u32 j;
990         const struct geometry *geometry;
991         u32 physical_chapter;
992
993         if (check_for_suspend(index)) {
994                 uds_log_info("Replay interrupted by index shutdown at chapter %llu",
995                              (unsigned long long) virtual);
996                 return -EBUSY;
997         }
998
999         geometry = index->volume->geometry;
1000         physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
1001         uds_prefetch_volume_chapter(index->volume, physical_chapter);
1002         uds_set_volume_index_open_chapter(index->volume_index, virtual);
1003
1004         result = rebuild_index_page_map(index, virtual);
1005         if (result != UDS_SUCCESS) {
1006                 return uds_log_error_strerror(result,
1007                                               "could not rebuild index page map for chapter %u",
1008                                               physical_chapter);
1009         }
1010
1011         for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1012                 u8 *record_page;
1013                 u32 record_page_number;
1014
1015                 record_page_number = geometry->index_pages_per_chapter + i;
1016                 result = uds_get_volume_record_page(index->volume, physical_chapter,
1017                                                     record_page_number, &record_page);
1018                 if (result != UDS_SUCCESS) {
1019                         return uds_log_error_strerror(result, "could not get page %d",
1020                                                       record_page_number);
1021                 }
1022
1023                 for (j = 0; j < geometry->records_per_page; j++) {
1024                         const u8 *name_bytes;
1025                         struct uds_record_name name;
1026
1027                         name_bytes = record_page + (j * BYTES_PER_RECORD);
1028                         memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1029                         result = replay_record(index, &name, virtual, sparse);
1030                         if (result != UDS_SUCCESS)
1031                                 return result;
1032                 }
1033         }
1034
1035         return UDS_SUCCESS;
1036 }
1037
1038 static int replay_volume(struct uds_index *index)
1039 {
1040         int result;
1041         u64 old_map_update;
1042         u64 new_map_update;
1043         u64 virtual;
1044         u64 from_virtual = index->oldest_virtual_chapter;
1045         u64 upto_virtual = index->newest_virtual_chapter;
1046         bool will_be_sparse;
1047
1048         uds_log_info("Replaying volume from chapter %llu through chapter %llu",
1049                      (unsigned long long) from_virtual,
1050                      (unsigned long long) upto_virtual);
1051
1052         /*
1053          * The index failed to load, so the volume index is empty. Add records to the volume index
1054          * in order, skipping non-hooks in chapters which will be sparse to save time.
1055          *
1056          * Go through each record page of each chapter and add the records back to the volume
1057          * index. This should not cause anything to be written to either the open chapter or the
1058          * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1059          * would have already been purged from the volume index when the chapter was opened.
1060          *
1061          * Also, go through each index page for each chapter and rebuild the index page map.
1062          */
1063         old_map_update = index->volume->index_page_map->last_update;
1064         for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1065                 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1066                                                        from_virtual, upto_virtual,
1067                                                        virtual);
1068                 result = replay_chapter(index, virtual, will_be_sparse);
1069                 if (result != UDS_SUCCESS)
1070                         return result;
1071         }
1072
1073         /* Also reap the chapter being replaced by the open chapter. */
1074         uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1075
1076         new_map_update = index->volume->index_page_map->last_update;
1077         if (new_map_update != old_map_update) {
1078                 uds_log_info("replay changed index page map update from %llu to %llu",
1079                              (unsigned long long) old_map_update,
1080                              (unsigned long long) new_map_update);
1081         }
1082
1083         return UDS_SUCCESS;
1084 }
1085
1086 static int rebuild_index(struct uds_index *index)
1087 {
1088         int result;
1089         u64 lowest;
1090         u64 highest;
1091         bool is_empty = false;
1092         u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1093
1094         index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1095         result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1096                                                     &is_empty);
1097         if (result != UDS_SUCCESS) {
1098                 return uds_log_fatal_strerror(result,
1099                                               "cannot rebuild index: unknown volume chapter boundaries");
1100         }
1101
1102         if (is_empty) {
1103                 index->newest_virtual_chapter = 0;
1104                 index->oldest_virtual_chapter = 0;
1105                 index->volume->lookup_mode = LOOKUP_NORMAL;
1106                 return UDS_SUCCESS;
1107         }
1108
1109         index->newest_virtual_chapter = highest + 1;
1110         index->oldest_virtual_chapter = lowest;
1111         if (index->newest_virtual_chapter ==
1112             (index->oldest_virtual_chapter + chapters_per_volume)) {
1113                 /* Skip the chapter shadowed by the open chapter. */
1114                 index->oldest_virtual_chapter++;
1115         }
1116
1117         result = replay_volume(index);
1118         if (result != UDS_SUCCESS)
1119                 return result;
1120
1121         index->volume->lookup_mode = LOOKUP_NORMAL;
1122         return UDS_SUCCESS;
1123 }
1124
1125 static void free_index_zone(struct index_zone *zone)
1126 {
1127         if (zone == NULL)
1128                 return;
1129
1130         uds_free_open_chapter(zone->open_chapter);
1131         uds_free_open_chapter(zone->writing_chapter);
1132         uds_free(zone);
1133 }
1134
1135 static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1136 {
1137         int result;
1138         struct index_zone *zone;
1139
1140         result = uds_allocate(1, struct index_zone, "index zone", &zone);
1141         if (result != UDS_SUCCESS)
1142                 return result;
1143
1144         result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1145                                        &zone->open_chapter);
1146         if (result != UDS_SUCCESS) {
1147                 free_index_zone(zone);
1148                 return result;
1149         }
1150
1151         result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1152                                        &zone->writing_chapter);
1153         if (result != UDS_SUCCESS) {
1154                 free_index_zone(zone);
1155                 return result;
1156         }
1157
1158         zone->index = index;
1159         zone->id = zone_number;
1160         index->zones[zone_number] = zone;
1161
1162         return UDS_SUCCESS;
1163 }
1164
1165 int uds_make_index(struct configuration *config, enum uds_open_index_type open_type,
1166                    struct index_load_context *load_context, index_callback_fn callback,
1167                    struct uds_index **new_index)
1168 {
1169         int result;
1170         bool loaded = false;
1171         bool new = (open_type == UDS_CREATE);
1172         struct uds_index *index = NULL;
1173         struct index_zone *zone;
1174         u64 nonce;
1175         unsigned int z;
1176
1177         result = uds_allocate_extended(struct uds_index, config->zone_count,
1178                                        struct uds_request_queue *, "index", &index);
1179         if (result != UDS_SUCCESS)
1180                 return result;
1181
1182         index->zone_count = config->zone_count;
1183
1184         result = uds_make_index_layout(config, new, &index->layout);
1185         if (result != UDS_SUCCESS) {
1186                 uds_free_index(index);
1187                 return result;
1188         }
1189
1190         result = uds_allocate(index->zone_count, struct index_zone *, "zones",
1191                               &index->zones);
1192         if (result != UDS_SUCCESS) {
1193                 uds_free_index(index);
1194                 return result;
1195         }
1196
1197         result = uds_make_volume(config, index->layout, &index->volume);
1198         if (result != UDS_SUCCESS) {
1199                 uds_free_index(index);
1200                 return result;
1201         }
1202
1203         index->volume->lookup_mode = LOOKUP_NORMAL;
1204         for (z = 0; z < index->zone_count; z++) {
1205                 result = make_index_zone(index, z);
1206                 if (result != UDS_SUCCESS) {
1207                         uds_free_index(index);
1208                         return uds_log_error_strerror(result,
1209                                                       "Could not create index zone");
1210                 }
1211         }
1212
1213         nonce = uds_get_volume_nonce(index->layout);
1214         result = uds_make_volume_index(config, nonce, &index->volume_index);
1215         if (result != UDS_SUCCESS) {
1216                 uds_free_index(index);
1217                 return uds_log_error_strerror(result, "could not make volume index");
1218         }
1219
1220         index->load_context = load_context;
1221         index->callback = callback;
1222
1223         result = initialize_index_queues(index, config->geometry);
1224         if (result != UDS_SUCCESS) {
1225                 uds_free_index(index);
1226                 return result;
1227         }
1228
1229         result = make_chapter_writer(index, &index->chapter_writer);
1230         if (result != UDS_SUCCESS) {
1231                 uds_free_index(index);
1232                 return result;
1233         }
1234
1235         if (!new) {
1236                 result = load_index(index);
1237                 switch (result) {
1238                 case UDS_SUCCESS:
1239                         loaded = true;
1240                         break;
1241                 case -ENOMEM:
1242                         /* We should not try a rebuild for this error. */
1243                         uds_log_error_strerror(result, "index could not be loaded");
1244                         break;
1245                 default:
1246                         uds_log_error_strerror(result, "index could not be loaded");
1247                         if (open_type == UDS_LOAD) {
1248                                 result = rebuild_index(index);
1249                                 if (result != UDS_SUCCESS) {
1250                                         uds_log_error_strerror(result,
1251                                                                "index could not be rebuilt");
1252                                 }
1253                         }
1254                         break;
1255                 }
1256         }
1257
1258         if (result != UDS_SUCCESS) {
1259                 uds_free_index(index);
1260                 return uds_log_error_strerror(result, "fatal error in %s()", __func__);
1261         }
1262
1263         for (z = 0; z < index->zone_count; z++) {
1264                 zone = index->zones[z];
1265                 zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1266                 zone->newest_virtual_chapter = index->newest_virtual_chapter;
1267         }
1268
1269         if (index->load_context != NULL) {
1270                 uds_lock_mutex(&index->load_context->mutex);
1271                 index->load_context->status = INDEX_READY;
1272                 /*
1273                  * If we get here, suspend is meaningless, but notify any thread trying to suspend
1274                  * us so it doesn't hang.
1275                  */
1276                 uds_broadcast_cond(&index->load_context->cond);
1277                 uds_unlock_mutex(&index->load_context->mutex);
1278         }
1279
1280         index->has_saved_open_chapter = loaded;
1281         index->need_to_save = !loaded;
1282         *new_index = index;
1283         return UDS_SUCCESS;
1284 }
1285
1286 void uds_free_index(struct uds_index *index)
1287 {
1288         unsigned int i;
1289
1290         if (index == NULL)
1291                 return;
1292
1293         uds_request_queue_finish(index->triage_queue);
1294         for (i = 0; i < index->zone_count; i++)
1295                 uds_request_queue_finish(index->zone_queues[i]);
1296
1297         free_chapter_writer(index->chapter_writer);
1298
1299         uds_free_volume_index(index->volume_index);
1300         if (index->zones != NULL) {
1301                 for (i = 0; i < index->zone_count; i++)
1302                         free_index_zone(index->zones[i]);
1303                 uds_free(index->zones);
1304         }
1305
1306         uds_free_volume(index->volume);
1307         uds_free_index_layout(uds_forget(index->layout));
1308         uds_free(index);
1309 }
1310
1311 /* Wait for the chapter writer to complete any outstanding writes. */
1312 void uds_wait_for_idle_index(struct uds_index *index)
1313 {
1314         struct chapter_writer *writer = index->chapter_writer;
1315
1316         uds_lock_mutex(&writer->mutex);
1317         while (writer->zones_to_write > 0)
1318                 uds_wait_cond(&writer->cond, &writer->mutex);
1319         uds_unlock_mutex(&writer->mutex);
1320 }
1321
1322 /* This function assumes that all requests have been drained. */
1323 int uds_save_index(struct uds_index *index)
1324 {
1325         int result;
1326
1327         if (!index->need_to_save)
1328                 return UDS_SUCCESS;
1329
1330         uds_wait_for_idle_index(index);
1331         index->prev_save = index->last_save;
1332         index->last_save = ((index->newest_virtual_chapter == 0) ?
1333                             NO_LAST_SAVE :
1334                             index->newest_virtual_chapter - 1);
1335         uds_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1336
1337         result = uds_save_index_state(index->layout, index);
1338         if (result != UDS_SUCCESS) {
1339                 uds_log_info("save index failed");
1340                 index->last_save = index->prev_save;
1341         } else {
1342                 index->has_saved_open_chapter = true;
1343                 index->need_to_save = false;
1344                 uds_log_info("finished save (vcn %llu)",
1345                              (unsigned long long) index->last_save);
1346         }
1347
1348         return result;
1349 }
1350
1351 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1352 {
1353         return uds_replace_volume_storage(index->volume, index->layout, bdev);
1354 }
1355
1356 /* Accessing statistics should be safe from any thread. */
1357 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1358 {
1359         struct volume_index_stats stats;
1360
1361         uds_get_volume_index_stats(index->volume_index, &stats);
1362         counters->entries_indexed = stats.record_count;
1363         counters->collisions = stats.collision_count;
1364         counters->entries_discarded = stats.discard_count;
1365
1366         counters->memory_used = (index->volume_index->memory_size +
1367                                  index->volume->cache_size +
1368                                  index->chapter_writer->memory_size);
1369 }
1370
1371 void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1372 {
1373         struct uds_index *index = request->index;
1374         struct uds_request_queue *queue;
1375
1376         switch (stage) {
1377         case STAGE_TRIAGE:
1378                 if (index->triage_queue != NULL) {
1379                         queue = index->triage_queue;
1380                         break;
1381                 }
1382
1383                 fallthrough;
1384
1385         case STAGE_INDEX:
1386                 request->zone_number =
1387                         uds_get_volume_index_zone(index->volume_index, &request->record_name);
1388                 fallthrough;
1389
1390         case STAGE_MESSAGE:
1391                 queue = index->zone_queues[request->zone_number];
1392                 break;
1393
1394         default:
1395                 ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1396                 return;
1397         }
1398
1399         uds_request_queue_enqueue(queue, request);
1400 }