dm vdo errors: remove unused error codes
[linux-block.git] / drivers / md / dm-vdo / packer.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "packer.h"
7
8 #include <linux/atomic.h>
9 #include <linux/blkdev.h>
10
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 #include "string-utils.h"
15
16 #include "admin-state.h"
17 #include "completion.h"
18 #include "constants.h"
19 #include "data-vio.h"
20 #include "dedupe.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "status-codes.h"
25 #include "vdo.h"
26 #include "vio.h"
27
28 static const struct version_number COMPRESSED_BLOCK_1_0 = {
29         .major_version = 1,
30         .minor_version = 0,
31 };
32
33 #define COMPRESSED_BLOCK_1_0_SIZE (4 + 4 + (2 * VDO_MAX_COMPRESSION_SLOTS))
34
35 /**
36  * vdo_get_compressed_block_fragment() - Get a reference to a compressed fragment from a compressed
37  *                                       block.
38  * @mapping_state [in] The mapping state for the look up.
39  * @compressed_block [in] The compressed block that was read from disk.
40  * @fragment_offset [out] The offset of the fragment within a compressed block.
41  * @fragment_size [out] The size of the fragment.
42  *
43  * Return: If a valid compressed fragment is found, VDO_SUCCESS; otherwise, VDO_INVALID_FRAGMENT if
44  *         the fragment is invalid.
45  */
46 int vdo_get_compressed_block_fragment(enum block_mapping_state mapping_state,
47                                       struct compressed_block *block,
48                                       u16 *fragment_offset, u16 *fragment_size)
49 {
50         u16 compressed_size;
51         u16 offset = 0;
52         unsigned int i;
53         u8 slot;
54         struct version_number version;
55
56         if (!vdo_is_state_compressed(mapping_state))
57                 return VDO_INVALID_FRAGMENT;
58
59         version = vdo_unpack_version_number(block->header.version);
60         if (!vdo_are_same_version(version, COMPRESSED_BLOCK_1_0))
61                 return VDO_INVALID_FRAGMENT;
62
63         slot = mapping_state - VDO_MAPPING_STATE_COMPRESSED_BASE;
64         if (slot >= VDO_MAX_COMPRESSION_SLOTS)
65                 return VDO_INVALID_FRAGMENT;
66
67         compressed_size = __le16_to_cpu(block->header.sizes[slot]);
68         for (i = 0; i < slot; i++) {
69                 offset += __le16_to_cpu(block->header.sizes[i]);
70                 if (offset >= VDO_COMPRESSED_BLOCK_DATA_SIZE)
71                         return VDO_INVALID_FRAGMENT;
72         }
73
74         if ((offset + compressed_size) > VDO_COMPRESSED_BLOCK_DATA_SIZE)
75                 return VDO_INVALID_FRAGMENT;
76
77         *fragment_offset = offset;
78         *fragment_size = compressed_size;
79         return VDO_SUCCESS;
80 }
81
82 /**
83  * assert_on_packer_thread() - Check that we are on the packer thread.
84  * @packer: The packer.
85  * @caller: The function which is asserting.
86  */
87 static inline void assert_on_packer_thread(struct packer *packer, const char *caller)
88 {
89         ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == packer->thread_id),
90                         "%s() called from packer thread", caller);
91 }
92
93 /**
94  * insert_in_sorted_list() - Insert a bin to the list.
95  * @packer: The packer.
96  * @bin: The bin to move to its sorted position.
97  *
98  * The list is in ascending order of free space. Since all bins are already in the list, this
99  * actually moves the bin to the correct position in the list.
100  */
101 static void insert_in_sorted_list(struct packer *packer, struct packer_bin *bin)
102 {
103         struct packer_bin *active_bin;
104
105         list_for_each_entry(active_bin, &packer->bins, list)
106                 if (active_bin->free_space > bin->free_space) {
107                         list_move_tail(&bin->list, &active_bin->list);
108                         return;
109                 }
110
111         list_move_tail(&bin->list, &packer->bins);
112 }
113
114 /**
115  * make_bin() - Allocate a bin and put it into the packer's list.
116  * @packer: The packer.
117  */
118 static int __must_check make_bin(struct packer *packer)
119 {
120         struct packer_bin *bin;
121         int result;
122
123         result = vdo_allocate_extended(struct packer_bin, VDO_MAX_COMPRESSION_SLOTS,
124                                        struct vio *, __func__, &bin);
125         if (result != VDO_SUCCESS)
126                 return result;
127
128         bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
129         INIT_LIST_HEAD(&bin->list);
130         list_add_tail(&bin->list, &packer->bins);
131         return VDO_SUCCESS;
132 }
133
134 /**
135  * vdo_make_packer() - Make a new block packer.
136  *
137  * @vdo: The vdo to which this packer belongs.
138  * @bin_count: The number of partial bins to keep in memory.
139  * @packer_ptr: A pointer to hold the new packer.
140  *
141  * Return: VDO_SUCCESS or an error
142  */
143 int vdo_make_packer(struct vdo *vdo, block_count_t bin_count, struct packer **packer_ptr)
144 {
145         struct packer *packer;
146         block_count_t i;
147         int result;
148
149         result = vdo_allocate(1, struct packer, __func__, &packer);
150         if (result != VDO_SUCCESS)
151                 return result;
152
153         packer->thread_id = vdo->thread_config.packer_thread;
154         packer->size = bin_count;
155         INIT_LIST_HEAD(&packer->bins);
156         vdo_set_admin_state_code(&packer->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
157
158         for (i = 0; i < bin_count; i++) {
159                 result = make_bin(packer);
160                 if (result != VDO_SUCCESS) {
161                         vdo_free_packer(packer);
162                         return result;
163                 }
164         }
165
166         /*
167          * The canceled bin can hold up to half the number of user vios. Every canceled vio in the
168          * bin must have a canceler for which it is waiting, and any canceler will only have
169          * canceled one lock holder at a time.
170          */
171         result = vdo_allocate_extended(struct packer_bin, MAXIMUM_VDO_USER_VIOS / 2,
172                                        struct vio *, __func__, &packer->canceled_bin);
173         if (result != VDO_SUCCESS) {
174                 vdo_free_packer(packer);
175                 return result;
176         }
177
178         result = vdo_make_default_thread(vdo, packer->thread_id);
179         if (result != VDO_SUCCESS) {
180                 vdo_free_packer(packer);
181                 return result;
182         }
183
184         *packer_ptr = packer;
185         return VDO_SUCCESS;
186 }
187
188 /**
189  * vdo_free_packer() - Free a block packer.
190  * @packer: The packer to free.
191  */
192 void vdo_free_packer(struct packer *packer)
193 {
194         struct packer_bin *bin, *tmp;
195
196         if (packer == NULL)
197                 return;
198
199         list_for_each_entry_safe(bin, tmp, &packer->bins, list) {
200                 list_del_init(&bin->list);
201                 vdo_free(bin);
202         }
203
204         vdo_free(vdo_forget(packer->canceled_bin));
205         vdo_free(packer);
206 }
207
208 /**
209  * get_packer_from_data_vio() - Get the packer from a data_vio.
210  * @data_vio: The data_vio.
211  *
212  * Return: The packer from the VDO to which the data_vio belongs.
213  */
214 static inline struct packer *get_packer_from_data_vio(struct data_vio *data_vio)
215 {
216         return vdo_from_data_vio(data_vio)->packer;
217 }
218
219 /**
220  * vdo_get_packer_statistics() - Get the current statistics from the packer.
221  * @packer: The packer to query.
222  *
223  * Return: a copy of the current statistics for the packer.
224  */
225 struct packer_statistics vdo_get_packer_statistics(const struct packer *packer)
226 {
227         const struct packer_statistics *stats = &packer->statistics;
228
229         return (struct packer_statistics) {
230                 .compressed_fragments_written = READ_ONCE(stats->compressed_fragments_written),
231                 .compressed_blocks_written = READ_ONCE(stats->compressed_blocks_written),
232                 .compressed_fragments_in_packer = READ_ONCE(stats->compressed_fragments_in_packer),
233         };
234 }
235
236 /**
237  * abort_packing() - Abort packing a data_vio.
238  * @data_vio: The data_vio to abort.
239  */
240 static void abort_packing(struct data_vio *data_vio)
241 {
242         struct packer *packer = get_packer_from_data_vio(data_vio);
243
244         WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
245                    packer->statistics.compressed_fragments_in_packer - 1);
246
247         write_data_vio(data_vio);
248 }
249
250 /**
251  * release_compressed_write_waiter() - Update a data_vio for which a successful compressed write
252  *                                     has completed and send it on its way.
253
254  * @data_vio: The data_vio to release.
255  * @allocation: The allocation to which the compressed block was written.
256  */
257 static void release_compressed_write_waiter(struct data_vio *data_vio,
258                                             struct allocation *allocation)
259 {
260         data_vio->new_mapped = (struct zoned_pbn) {
261                 .pbn = allocation->pbn,
262                 .zone = allocation->zone,
263                 .state = data_vio->compression.slot + VDO_MAPPING_STATE_COMPRESSED_BASE,
264         };
265
266         vdo_share_compressed_write_lock(data_vio, allocation->lock);
267         update_metadata_for_data_vio_write(data_vio, allocation->lock);
268 }
269
270 /**
271  * finish_compressed_write() - Finish a compressed block write.
272  * @completion: The compressed write completion.
273  *
274  * This callback is registered in continue_after_allocation().
275  */
276 static void finish_compressed_write(struct vdo_completion *completion)
277 {
278         struct data_vio *agent = as_data_vio(completion);
279         struct data_vio *client, *next;
280
281         assert_data_vio_in_allocated_zone(agent);
282
283         /*
284          * Process all the non-agent waiters first to ensure that the pbn lock can not be released
285          * until all of them have had a chance to journal their increfs.
286          */
287         for (client = agent->compression.next_in_batch; client != NULL; client = next) {
288                 next = client->compression.next_in_batch;
289                 release_compressed_write_waiter(client, &agent->allocation);
290         }
291
292         completion->error_handler = handle_data_vio_error;
293         release_compressed_write_waiter(agent, &agent->allocation);
294 }
295
296 static void handle_compressed_write_error(struct vdo_completion *completion)
297 {
298         struct data_vio *agent = as_data_vio(completion);
299         struct allocation *allocation = &agent->allocation;
300         struct data_vio *client, *next;
301
302         if (vdo_requeue_completion_if_needed(completion, allocation->zone->thread_id))
303                 return;
304
305         update_vio_error_stats(as_vio(completion),
306                                "Completing compressed write vio for physical block %llu with error",
307                                (unsigned long long) allocation->pbn);
308
309         for (client = agent->compression.next_in_batch; client != NULL; client = next) {
310                 next = client->compression.next_in_batch;
311                 write_data_vio(client);
312         }
313
314         /* Now that we've released the batch from the packer, forget the error and continue on. */
315         vdo_reset_completion(completion);
316         completion->error_handler = handle_data_vio_error;
317         write_data_vio(agent);
318 }
319
320 /**
321  * add_to_bin() - Put a data_vio in a specific packer_bin in which it will definitely fit.
322  * @bin: The bin in which to put the data_vio.
323  * @data_vio: The data_vio to add.
324  */
325 static void add_to_bin(struct packer_bin *bin, struct data_vio *data_vio)
326 {
327         data_vio->compression.bin = bin;
328         data_vio->compression.slot = bin->slots_used;
329         bin->incoming[bin->slots_used++] = data_vio;
330 }
331
332 /**
333  * remove_from_bin() - Get the next data_vio whose compression has not been canceled from a bin.
334  * @packer: The packer.
335  * @bin: The bin from which to get a data_vio.
336  *
337  * Any canceled data_vios will be moved to the canceled bin.
338  * Return: An uncanceled data_vio from the bin or NULL if there are none.
339  */
340 static struct data_vio *remove_from_bin(struct packer *packer, struct packer_bin *bin)
341 {
342         while (bin->slots_used > 0) {
343                 struct data_vio *data_vio = bin->incoming[--bin->slots_used];
344
345                 if (!advance_data_vio_compression_stage(data_vio).may_not_compress) {
346                         data_vio->compression.bin = NULL;
347                         return data_vio;
348                 }
349
350                 add_to_bin(packer->canceled_bin, data_vio);
351         }
352
353         /* The bin is now empty. */
354         bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
355         return NULL;
356 }
357
358 /**
359  * initialize_compressed_block() - Initialize a compressed block.
360  * @block: The compressed block to initialize.
361  * @size: The size of the agent's fragment.
362  *
363  * This method initializes the compressed block in the compressed write agent. Because the
364  * compressor already put the agent's compressed fragment at the start of the compressed block's
365  * data field, it needn't be copied. So all we need do is initialize the header and set the size of
366  * the agent's fragment.
367  */
368 static void initialize_compressed_block(struct compressed_block *block, u16 size)
369 {
370         /*
371          * Make sure the block layout isn't accidentally changed by changing the length of the
372          * block header.
373          */
374         BUILD_BUG_ON(sizeof(struct compressed_block_header) != COMPRESSED_BLOCK_1_0_SIZE);
375
376         block->header.version = vdo_pack_version_number(COMPRESSED_BLOCK_1_0);
377         block->header.sizes[0] = __cpu_to_le16(size);
378 }
379
380 /**
381  * pack_fragment() - Pack a data_vio's fragment into the compressed block in which it is already
382  *                   known to fit.
383  * @compression: The agent's compression_state to pack in to.
384  * @data_vio: The data_vio to pack.
385  * @offset: The offset into the compressed block at which to pack the fragment.
386  * @compressed_block: The compressed block which will be written out when batch is fully packed.
387  *
388  * Return: The new amount of space used.
389  */
390 static block_size_t __must_check pack_fragment(struct compression_state *compression,
391                                                struct data_vio *data_vio,
392                                                block_size_t offset, slot_number_t slot,
393                                                struct compressed_block *block)
394 {
395         struct compression_state *to_pack = &data_vio->compression;
396         char *fragment = to_pack->block->data;
397
398         to_pack->next_in_batch = compression->next_in_batch;
399         compression->next_in_batch = data_vio;
400         to_pack->slot = slot;
401         block->header.sizes[slot] = __cpu_to_le16(to_pack->size);
402         memcpy(&block->data[offset], fragment, to_pack->size);
403         return (offset + to_pack->size);
404 }
405
406 /**
407  * compressed_write_end_io() - The bio_end_io for a compressed block write.
408  * @bio: The bio for the compressed write.
409  */
410 static void compressed_write_end_io(struct bio *bio)
411 {
412         struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
413
414         vdo_count_completed_bios(bio);
415         set_data_vio_allocated_zone_callback(data_vio, finish_compressed_write);
416         continue_data_vio_with_error(data_vio, blk_status_to_errno(bio->bi_status));
417 }
418
419 /**
420  * write_bin() - Write out a bin.
421  * @packer: The packer.
422  * @bin: The bin to write.
423  */
424 static void write_bin(struct packer *packer, struct packer_bin *bin)
425 {
426         int result;
427         block_size_t offset;
428         slot_number_t slot = 1;
429         struct compression_state *compression;
430         struct compressed_block *block;
431         struct data_vio *agent = remove_from_bin(packer, bin);
432         struct data_vio *client;
433         struct packer_statistics *stats;
434
435         if (agent == NULL)
436                 return;
437
438         compression = &agent->compression;
439         compression->slot = 0;
440         block = compression->block;
441         initialize_compressed_block(block, compression->size);
442         offset = compression->size;
443
444         while ((client = remove_from_bin(packer, bin)) != NULL)
445                 offset = pack_fragment(compression, client, offset, slot++, block);
446
447         /*
448          * If the batch contains only a single vio, then we save nothing by saving the compressed
449          * form. Continue processing the single vio in the batch.
450          */
451         if (slot == 1) {
452                 abort_packing(agent);
453                 return;
454         }
455
456         if (slot < VDO_MAX_COMPRESSION_SLOTS) {
457                 /* Clear out the sizes of the unused slots */
458                 memset(&block->header.sizes[slot], 0,
459                        (VDO_MAX_COMPRESSION_SLOTS - slot) * sizeof(__le16));
460         }
461
462         agent->vio.completion.error_handler = handle_compressed_write_error;
463         if (vdo_is_read_only(vdo_from_data_vio(agent))) {
464                 continue_data_vio_with_error(agent, VDO_READ_ONLY);
465                 return;
466         }
467
468         result = vio_reset_bio(&agent->vio, (char *) block, compressed_write_end_io,
469                                REQ_OP_WRITE, agent->allocation.pbn);
470         if (result != VDO_SUCCESS) {
471                 continue_data_vio_with_error(agent, result);
472                 return;
473         }
474
475         /*
476          * Once the compressed write is submitted, the fragments are no longer in the packer, so
477          * update stats now.
478          */
479         stats = &packer->statistics;
480         WRITE_ONCE(stats->compressed_fragments_in_packer,
481                    (stats->compressed_fragments_in_packer - slot));
482         WRITE_ONCE(stats->compressed_fragments_written,
483                    (stats->compressed_fragments_written + slot));
484         WRITE_ONCE(stats->compressed_blocks_written,
485                    stats->compressed_blocks_written + 1);
486
487         vdo_submit_data_vio(agent);
488 }
489
490 /**
491  * add_data_vio_to_packer_bin() - Add a data_vio to a bin's incoming queue
492  * @packer: The packer.
493  * @bin: The bin to which to add the data_vio.
494  * @data_vio: The data_vio to add to the bin's queue.
495  *
496  * Adds a data_vio to a bin's incoming queue, handles logical space change, and calls physical
497  * space processor.
498  */
499 static void add_data_vio_to_packer_bin(struct packer *packer, struct packer_bin *bin,
500                                        struct data_vio *data_vio)
501 {
502         /* If the selected bin doesn't have room, start a new batch to make room. */
503         if (bin->free_space < data_vio->compression.size)
504                 write_bin(packer, bin);
505
506         add_to_bin(bin, data_vio);
507         bin->free_space -= data_vio->compression.size;
508
509         /* If we happen to exactly fill the bin, start a new batch. */
510         if ((bin->slots_used == VDO_MAX_COMPRESSION_SLOTS) ||
511             (bin->free_space == 0))
512                 write_bin(packer, bin);
513
514         /* Now that we've finished changing the free space, restore the sort order. */
515         insert_in_sorted_list(packer, bin);
516 }
517
518 /**
519  * select_bin() - Select the bin that should be used to pack the compressed data in a data_vio with
520  *                other data_vios.
521  * @packer: The packer.
522  * @data_vio: The data_vio.
523  */
524 static struct packer_bin * __must_check select_bin(struct packer *packer,
525                                                    struct data_vio *data_vio)
526 {
527         /*
528          * First best fit: select the bin with the least free space that has enough room for the
529          * compressed data in the data_vio.
530          */
531         struct packer_bin *bin, *fullest_bin;
532
533         list_for_each_entry(bin, &packer->bins, list) {
534                 if (bin->free_space >= data_vio->compression.size)
535                         return bin;
536         }
537
538         /*
539          * None of the bins have enough space for the data_vio. We're not allowed to create new
540          * bins, so we have to overflow one of the existing bins. It's pretty intuitive to select
541          * the fullest bin, since that "wastes" the least amount of free space in the compressed
542          * block. But if the space currently used in the fullest bin is smaller than the compressed
543          * size of the incoming block, it seems wrong to force that bin to write when giving up on
544          * compressing the incoming data_vio would likewise "waste" the least amount of free space.
545          */
546         fullest_bin = list_first_entry(&packer->bins, struct packer_bin, list);
547         if (data_vio->compression.size >=
548             (VDO_COMPRESSED_BLOCK_DATA_SIZE - fullest_bin->free_space))
549                 return NULL;
550
551         /*
552          * The fullest bin doesn't have room, but writing it out and starting a new batch with the
553          * incoming data_vio will increase the packer's free space.
554          */
555         return fullest_bin;
556 }
557
558 /**
559  * vdo_attempt_packing() - Attempt to rewrite the data in this data_vio as part of a compressed
560  *                         block.
561  * @data_vio: The data_vio to pack.
562  */
563 void vdo_attempt_packing(struct data_vio *data_vio)
564 {
565         int result;
566         struct packer_bin *bin;
567         struct data_vio_compression_status status = get_data_vio_compression_status(data_vio);
568         struct packer *packer = get_packer_from_data_vio(data_vio);
569
570         assert_on_packer_thread(packer, __func__);
571
572         result = ASSERT((status.stage == DATA_VIO_COMPRESSING),
573                         "attempt to pack data_vio not ready for packing, stage: %u",
574                         status.stage);
575         if (result != VDO_SUCCESS)
576                 return;
577
578         /*
579          * Increment whether or not this data_vio will be packed or not since abort_packing()
580          * always decrements the counter.
581          */
582         WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
583                    packer->statistics.compressed_fragments_in_packer + 1);
584
585         /*
586          * If packing of this data_vio is disallowed for administrative reasons, give up before
587          * making any state changes.
588          */
589         if (!vdo_is_state_normal(&packer->state) ||
590             (data_vio->flush_generation < packer->flush_generation)) {
591                 abort_packing(data_vio);
592                 return;
593         }
594
595         /*
596          * The advance_data_vio_compression_stage() check here verifies that the data_vio is
597          * allowed to be compressed (if it has already been canceled, we'll fall out here). Once
598          * the data_vio is in the DATA_VIO_PACKING state, it must be guaranteed to be put in a bin
599          * before any more requests can be processed by the packer thread. Otherwise, a canceling
600          * data_vio could attempt to remove the canceled data_vio from the packer and fail to
601          * rendezvous with it. Thus, we must call select_bin() first to ensure that we will
602          * actually add the data_vio to a bin before advancing to the DATA_VIO_PACKING stage.
603          */
604         bin = select_bin(packer, data_vio);
605         if ((bin == NULL) ||
606             (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_PACKING)) {
607                 abort_packing(data_vio);
608                 return;
609         }
610
611         add_data_vio_to_packer_bin(packer, bin, data_vio);
612 }
613
614 /**
615  * check_for_drain_complete() - Check whether the packer has drained.
616  * @packer: The packer.
617  */
618 static void check_for_drain_complete(struct packer *packer)
619 {
620         if (vdo_is_state_draining(&packer->state) && (packer->canceled_bin->slots_used == 0))
621                 vdo_finish_draining(&packer->state);
622 }
623
624 /**
625  * write_all_non_empty_bins() - Write out all non-empty bins on behalf of a flush or suspend.
626  * @packer: The packer being flushed.
627  */
628 static void write_all_non_empty_bins(struct packer *packer)
629 {
630         struct packer_bin *bin;
631
632         list_for_each_entry(bin, &packer->bins, list)
633                 write_bin(packer, bin);
634                 /*
635                  * We don't need to re-sort the bin here since this loop will make every bin have
636                  * the same amount of free space, so every ordering is sorted.
637                  */
638
639         check_for_drain_complete(packer);
640 }
641
642 /**
643  * vdo_flush_packer() - Request that the packer flush asynchronously.
644  * @packer: The packer to flush.
645  *
646  * All bins with at least two compressed data blocks will be written out, and any solitary pending
647  * VIOs will be released from the packer. While flushing is in progress, any VIOs submitted to
648  * vdo_attempt_packing() will be continued immediately without attempting to pack them.
649  */
650 void vdo_flush_packer(struct packer *packer)
651 {
652         assert_on_packer_thread(packer, __func__);
653         if (vdo_is_state_normal(&packer->state))
654                 write_all_non_empty_bins(packer);
655 }
656
657 /**
658  * vdo_remove_lock_holder_from_packer() - Remove a lock holder from the packer.
659  * @completion: The data_vio which needs a lock held by a data_vio in the packer. The data_vio's
660  *              compression.lock_holder field will point to the data_vio to remove.
661  */
662 void vdo_remove_lock_holder_from_packer(struct vdo_completion *completion)
663 {
664         struct data_vio *data_vio = as_data_vio(completion);
665         struct packer *packer = get_packer_from_data_vio(data_vio);
666         struct data_vio *lock_holder;
667         struct packer_bin *bin;
668         slot_number_t slot;
669
670         assert_data_vio_in_packer_zone(data_vio);
671
672         lock_holder = vdo_forget(data_vio->compression.lock_holder);
673         bin = lock_holder->compression.bin;
674         ASSERT_LOG_ONLY((bin != NULL), "data_vio in packer has a bin");
675
676         slot = lock_holder->compression.slot;
677         bin->slots_used--;
678         if (slot < bin->slots_used) {
679                 bin->incoming[slot] = bin->incoming[bin->slots_used];
680                 bin->incoming[slot]->compression.slot = slot;
681         }
682
683         lock_holder->compression.bin = NULL;
684         lock_holder->compression.slot = 0;
685
686         if (bin != packer->canceled_bin) {
687                 bin->free_space += lock_holder->compression.size;
688                 insert_in_sorted_list(packer, bin);
689         }
690
691         abort_packing(lock_holder);
692         check_for_drain_complete(packer);
693 }
694
695 /**
696  * vdo_increment_packer_flush_generation() - Increment the flush generation in the packer.
697  * @packer: The packer.
698  *
699  * This will also cause the packer to flush so that any VIOs from previous generations will exit
700  * the packer.
701  */
702 void vdo_increment_packer_flush_generation(struct packer *packer)
703 {
704         assert_on_packer_thread(packer, __func__);
705         packer->flush_generation++;
706         vdo_flush_packer(packer);
707 }
708
709 /**
710  * initiate_drain() - Initiate a drain.
711  *
712  * Implements vdo_admin_initiator_fn.
713  */
714 static void initiate_drain(struct admin_state *state)
715 {
716         struct packer *packer = container_of(state, struct packer, state);
717
718         write_all_non_empty_bins(packer);
719 }
720
721 /**
722  * vdo_drain_packer() - Drain the packer by preventing any more VIOs from entering the packer and
723  *                      then flushing.
724  * @packer: The packer to drain.
725  * @completion: The completion to finish when the packer has drained.
726  */
727 void vdo_drain_packer(struct packer *packer, struct vdo_completion *completion)
728 {
729         assert_on_packer_thread(packer, __func__);
730         vdo_start_draining(&packer->state, VDO_ADMIN_STATE_SUSPENDING, completion,
731                            initiate_drain);
732 }
733
734 /**
735  * vdo_resume_packer() - Resume a packer which has been suspended.
736  * @packer: The packer to resume.
737  * @parent: The completion to finish when the packer has resumed.
738  */
739 void vdo_resume_packer(struct packer *packer, struct vdo_completion *parent)
740 {
741         assert_on_packer_thread(packer, __func__);
742         vdo_continue_completion(parent, vdo_resume_if_quiescent(&packer->state));
743 }
744
745 static void dump_packer_bin(const struct packer_bin *bin, bool canceled)
746 {
747         if (bin->slots_used == 0)
748                 /* Don't dump empty bins. */
749                 return;
750
751         uds_log_info("    %sBin slots_used=%u free_space=%zu",
752                      (canceled ? "Canceled" : ""), bin->slots_used, bin->free_space);
753
754         /*
755          * FIXME: dump vios in bin->incoming? The vios should have been dumped from the vio pool.
756          * Maybe just dump their addresses so it's clear they're here?
757          */
758 }
759
760 /**
761  * vdo_dump_packer() - Dump the packer.
762  * @packer: The packer.
763  *
764  * Context: dumps in a thread-unsafe fashion.
765  */
766 void vdo_dump_packer(const struct packer *packer)
767 {
768         struct packer_bin *bin;
769
770         uds_log_info("packer");
771         uds_log_info("  flushGeneration=%llu state %s  packer_bin_count=%llu",
772                      (unsigned long long) packer->flush_generation,
773                      vdo_get_admin_state_code(&packer->state)->name,
774                      (unsigned long long) packer->size);
775
776         list_for_each_entry(bin, &packer->bins, list)
777                 dump_packer_bin(bin, false);
778
779         dump_packer_bin(packer->canceled_bin, true);
780 }