b3ffb85518c3c76d3754e0f217899820fa755a5d
[linux-2.6-block.git] / drivers / md / dm-vdo / dedupe.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 /**
7  * DOC:
8  *
9  * Hash Locks:
10  *
11  * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12  * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13  * also against each other. This saves on index queries and allows those data_vios to concurrently
14  * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15  * index query is needed for each hash_lock, instead of one for every data_vio.
16  *
17  * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
18  * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
19  * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
20  * containing the lock. An asynchronous operation is almost always performed upon entering a state,
21  * and the callback from that operation triggers exiting the state and entering a new state.
22  *
23  * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
24  * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
25  * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
26  * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
27  * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
28  * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
29  * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
30  * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
31  *
32  * The existence of lock waiters is a key factor controlling which state the lock transitions to
33  * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
34  * doesn't, it will try to clean up and exit.
35  *
36  * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
37  * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
38  * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
39  * new copy of the data to a full data block or a slot in a compressed block (WRITING).
40  *
41  * Cleaning up consists of updating the index when the data location is different from the initial
42  * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
43  * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
44  * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
45  *
46  * The shortest sequence of states is for non-concurrent writes of new data:
47  *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
48  * This sequence is short because no PBN read lock or index update is needed.
49  *
50  * Non-concurrent, finding valid advice looks like this (endpoints elided):
51  *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
52  * Or with stale advice (endpoints elided):
53  *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
54  *
55  * When there are not enough available reference count increments available on a PBN for a data_vio
56  * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
57  * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
58  * data_vios will be directed to it. The two locks will proceed independently, but only the new
59  * lock will have the right to update the index (unless it also forks).
60  *
61  * Since rollover happens in a lock instance, once a valid data location has been selected, it will
62  * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
63  * non-endpoint states can be re-entered.
64  *
65  * The function names in this module follow a convention referencing the states and transitions in
66  * the state machine. For example, for the LOCKING state, there are start_locking() and
67  * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
68  * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
69  * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
70  * code actually obtaining the lock. It does any bookkeeping or decision-making required and
71  * invokes the appropriate start function of the state being transitioned to after LOCKING.
72  *
73  * ----------------------------------------------------------------------
74  *
75  * Index Queries:
76  *
77  * A query to the UDS index is handled asynchronously by the index's threads. When the query is
78  * complete, a callback supplied with the query will be called from one of the those threads. Under
79  * heavy system load, the index may be slower to respond then is desirable for reasonable I/O
80  * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
81  * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
82  * request without deduplicating. However, because the uds_request struct itself is supplied by the
83  * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
84  * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
85  * reference to the data_vio on behalf of which they are performing a query.
86  *
87  * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
88  * its hash_zone's pool. If one is available, that context is prepared, associated with the
89  * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
90  * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
91  * goes well, the dedupe callback will be called by the index which will change the context's state
92  * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
93  * zone where the query results will be processed and the context will be put back in the idle
94  * state and returned to the hash_zone's available list.
95  *
96  * The first time an index query is launched from a given hash_zone, a timer is started. When the
97  * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
98  * pending list will be searched for any contexts in the pending state which have been running for
99  * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
100  * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
101  * data_vios associated with timed out contexts are sent to continue processing their write
102  * operation without deduplicating. The timer is also restarted.
103  *
104  * When the dedupe callback is run for a context which is in the timed out state, that context is
105  * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
106  * associated data_vios have already been dispatched.
107  *
108  * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
109  * be searched for any contexts which are timed out and complete. One of these will be used
110  * immediately, and the rest will be returned to the available list and marked idle.
111  */
112
113 #include "dedupe.h"
114
115 #include <linux/atomic.h>
116 #include <linux/jiffies.h>
117 #include <linux/kernel.h>
118 #include <linux/kobject.h>
119 #include <linux/list.h>
120 #include <linux/ratelimit.h>
121 #include <linux/spinlock.h>
122 #include <linux/timer.h>
123
124 #include "logger.h"
125 #include "memory-alloc.h"
126 #include "numeric.h"
127 #include "permassert.h"
128 #include "string-utils.h"
129 #include "uds.h"
130
131 #include "action-manager.h"
132 #include "admin-state.h"
133 #include "completion.h"
134 #include "constants.h"
135 #include "data-vio.h"
136 #include "int-map.h"
137 #include "io-submitter.h"
138 #include "packer.h"
139 #include "physical-zone.h"
140 #include "slab-depot.h"
141 #include "statistics.h"
142 #include "types.h"
143 #include "vdo.h"
144 #include "wait-queue.h"
145
146 struct uds_attribute {
147         struct attribute attr;
148         const char *(*show_string)(struct hash_zones *hash_zones);
149 };
150
151 enum timer_state {
152         DEDUPE_QUERY_TIMER_IDLE,
153         DEDUPE_QUERY_TIMER_RUNNING,
154         DEDUPE_QUERY_TIMER_FIRED,
155 };
156
157 enum dedupe_context_state {
158         DEDUPE_CONTEXT_IDLE,
159         DEDUPE_CONTEXT_PENDING,
160         DEDUPE_CONTEXT_TIMED_OUT,
161         DEDUPE_CONTEXT_COMPLETE,
162         DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
163 };
164
165 /* Possible index states: closed, opened, or transitioning between those two. */
166 enum index_state {
167         IS_CLOSED,
168         IS_CHANGING,
169         IS_OPENED,
170 };
171
172 static const char *CLOSED = "closed";
173 static const char *CLOSING = "closing";
174 static const char *ERROR = "error";
175 static const char *OFFLINE = "offline";
176 static const char *ONLINE = "online";
177 static const char *OPENING = "opening";
178 static const char *SUSPENDED = "suspended";
179 static const char *UNKNOWN = "unknown";
180
181 /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
182 enum {
183         UDS_ADVICE_VERSION = 2,
184         /* version byte + state byte + 64-bit little-endian PBN */
185         UDS_ADVICE_SIZE = 1 + 1 + sizeof(u64),
186 };
187
188 enum hash_lock_state {
189         /* State for locks that are not in use or are being initialized. */
190         VDO_HASH_LOCK_INITIALIZING,
191
192         /* This is the sequence of states typically used on the non-dedupe path. */
193         VDO_HASH_LOCK_QUERYING,
194         VDO_HASH_LOCK_WRITING,
195         VDO_HASH_LOCK_UPDATING,
196
197         /* The remaining states are typically used on the dedupe path in this order. */
198         VDO_HASH_LOCK_LOCKING,
199         VDO_HASH_LOCK_VERIFYING,
200         VDO_HASH_LOCK_DEDUPING,
201         VDO_HASH_LOCK_UNLOCKING,
202
203         /*
204          * Terminal state for locks returning to the pool. Must be last both because it's the final
205          * state, and also because it's used to count the states.
206          */
207         VDO_HASH_LOCK_BYPASSING,
208 };
209
210 static const char * const LOCK_STATE_NAMES[] = {
211         [VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
212         [VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
213         [VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
214         [VDO_HASH_LOCK_LOCKING] = "LOCKING",
215         [VDO_HASH_LOCK_QUERYING] = "QUERYING",
216         [VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
217         [VDO_HASH_LOCK_UPDATING] = "UPDATING",
218         [VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
219         [VDO_HASH_LOCK_WRITING] = "WRITING",
220 };
221
222 struct hash_lock {
223         /* The block hash covered by this lock */
224         struct uds_record_name hash;
225
226         /* When the lock is unused, this list entry allows the lock to be pooled */
227         struct list_head pool_node;
228
229         /*
230          * A list containing the data VIOs sharing this lock, all having the same record name and
231          * data block contents, linked by their hash_lock_node fields.
232          */
233         struct list_head duplicate_ring;
234
235         /* The number of data_vios sharing this lock instance */
236         data_vio_count_t reference_count;
237
238         /* The maximum value of reference_count in the lifetime of this lock */
239         data_vio_count_t max_references;
240
241         /* The current state of this lock */
242         enum hash_lock_state state;
243
244         /* True if the UDS index should be updated with new advice */
245         bool update_advice;
246
247         /* True if the advice has been verified to be a true duplicate */
248         bool verified;
249
250         /* True if the lock has already accounted for an initial verification */
251         bool verify_counted;
252
253         /* True if this lock is registered in the lock map (cleared on rollover) */
254         bool registered;
255
256         /*
257          * If verified is false, this is the location of a possible duplicate. If verified is true,
258          * it is the verified location of a true duplicate.
259          */
260         struct zoned_pbn duplicate;
261
262         /* The PBN lock on the block containing the duplicate data */
263         struct pbn_lock *duplicate_lock;
264
265         /* The data_vio designated to act on behalf of the lock */
266         struct data_vio *agent;
267
268         /*
269          * Other data_vios with data identical to the agent who are currently waiting for the agent
270          * to get the information they all need to deduplicate--either against each other, or
271          * against an existing duplicate on disk.
272          */
273         struct vdo_wait_queue waiters;
274 };
275
276 enum {
277         LOCK_POOL_CAPACITY = MAXIMUM_VDO_USER_VIOS,
278 };
279
280 struct hash_zones {
281         struct action_manager *manager;
282         struct kobject dedupe_directory;
283         struct uds_parameters parameters;
284         struct uds_index_session *index_session;
285         struct ratelimit_state ratelimiter;
286         atomic64_t timeouts;
287         atomic64_t dedupe_context_busy;
288
289         /* This spinlock protects the state fields and the starting of dedupe requests. */
290         spinlock_t lock;
291
292         /* The fields in the next block are all protected by the lock */
293         struct vdo_completion completion;
294         enum index_state index_state;
295         enum index_state index_target;
296         struct admin_state state;
297         bool changing;
298         bool create_flag;
299         bool dedupe_flag;
300         bool error_flag;
301         u64 reported_timeouts;
302
303         /* The number of zones */
304         zone_count_t zone_count;
305         /* The hash zones themselves */
306         struct hash_zone zones[];
307 };
308
309 /* These are in milliseconds. */
310 unsigned int vdo_dedupe_index_timeout_interval = 5000;
311 unsigned int vdo_dedupe_index_min_timer_interval = 100;
312 /* Same two variables, in jiffies for easier consumption. */
313 static u64 vdo_dedupe_index_timeout_jiffies;
314 static u64 vdo_dedupe_index_min_timer_jiffies;
315
316 static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
317 {
318         vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
319         return container_of(completion, struct hash_zone, completion);
320 }
321
322 static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
323 {
324         vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
325         return container_of(completion, struct hash_zones, completion);
326 }
327
328 static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
329 {
330         ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
331                         "%s called on hash zone thread", name);
332 }
333
334 static inline bool change_context_state(struct dedupe_context *context, int old, int new)
335 {
336         return (atomic_cmpxchg(&context->state, old, new) == old);
337 }
338
339 static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
340 {
341         return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
342 }
343
344 /**
345  * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
346  * @zone: The zone from which the lock was borrowed.
347  * @lock: The lock that is no longer in use.
348  */
349 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
350 {
351         memset(lock, 0, sizeof(*lock));
352         INIT_LIST_HEAD(&lock->pool_node);
353         INIT_LIST_HEAD(&lock->duplicate_ring);
354         vdo_waitq_init(&lock->waiters);
355         list_add_tail(&lock->pool_node, &zone->lock_pool);
356 }
357
358 /**
359  * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
360  *                            the hash_lock the data_vio holds (if there is one).
361  * @data_vio: The data_vio to query.
362  *
363  * Return: The PBN lock on the data_vio's duplicate location.
364  */
365 struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
366 {
367         if (data_vio->hash_lock == NULL)
368                 return NULL;
369
370         return data_vio->hash_lock->duplicate_lock;
371 }
372
373 /**
374  * hash_lock_key() - Return hash_lock's record name as a hash code.
375  * @lock: The hash lock.
376  *
377  * Return: The key to use for the int map.
378  */
379 static inline u64 hash_lock_key(struct hash_lock *lock)
380 {
381         return get_unaligned_le64(&lock->hash.name);
382 }
383
384 /**
385  * get_hash_lock_state_name() - Get the string representation of a hash lock state.
386  * @state: The hash lock state.
387  *
388  * Return: The short string representing the state
389  */
390 static const char *get_hash_lock_state_name(enum hash_lock_state state)
391 {
392         /* Catch if a state has been added without updating the name array. */
393         BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
394         return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
395 }
396
397 /**
398  * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
399  *                            is being called in the hash zone.
400  * @data_vio: The data_vio expected to be the lock agent.
401  * @where: A string describing the function making the assertion.
402  */
403 static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
404 {
405         /* Not safe to access the agent field except from the hash zone. */
406         assert_data_vio_in_hash_zone(data_vio);
407         ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
408                         "%s must be for the hash lock agent", where);
409 }
410
411 /**
412  * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
413  *                        physical zone of the PBN lock.
414  * @hash_lock: The hash lock to update.
415  * @pbn_lock: The PBN read lock to use as the duplicate lock.
416  */
417 static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
418 {
419         ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
420                         "hash lock must not already hold a duplicate lock");
421
422         pbn_lock->holder_count += 1;
423         hash_lock->duplicate_lock = pbn_lock;
424 }
425
426 /**
427  * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
428  * @lock: The lock containing the wait queue.
429  *
430  * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
431  */
432 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
433 {
434         return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
435 }
436
437 /**
438  * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
439  * @data_vio: The data_vio to update.
440  * @new_lock: The hash lock the data_vio is joining.
441  *
442  * Updates the hash lock (or locks) to reflect the change in membership.
443  */
444 static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
445 {
446         struct hash_lock *old_lock = data_vio->hash_lock;
447
448         if (old_lock != NULL) {
449                 ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
450                                 "must have a hash zone when holding a hash lock");
451                 ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
452                                 "must be on a hash lock ring when holding a hash lock");
453                 ASSERT_LOG_ONLY(old_lock->reference_count > 0,
454                                 "hash lock reference must be counted");
455
456                 if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
457                     (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
458                         /*
459                          * If the reference count goes to zero in a non-terminal state, we're most
460                          * likely leaking this lock.
461                          */
462                         ASSERT_LOG_ONLY(old_lock->reference_count > 1,
463                                         "hash locks should only become unreferenced in a terminal state, not state %s",
464                                         get_hash_lock_state_name(old_lock->state));
465                 }
466
467                 list_del_init(&data_vio->hash_lock_entry);
468                 old_lock->reference_count -= 1;
469
470                 data_vio->hash_lock = NULL;
471         }
472
473         if (new_lock != NULL) {
474                 /*
475                  * Keep all data_vios sharing the lock on a ring since they can complete in any
476                  * order and we'll always need a pointer to one to compare data.
477                  */
478                 list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
479                 new_lock->reference_count += 1;
480                 if (new_lock->max_references < new_lock->reference_count)
481                         new_lock->max_references = new_lock->reference_count;
482
483                 data_vio->hash_lock = new_lock;
484         }
485 }
486
487 /* There are loops in the state diagram, so some forward decl's are needed. */
488 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
489                            bool agent_is_done);
490 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
491 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
492 static void unlock_duplicate_pbn(struct vdo_completion *completion);
493 static void transfer_allocation_lock(struct data_vio *data_vio);
494
495 /**
496  * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
497  *                    longer needed to be an agent for the hash lock.
498  * @data_vio: The data_vio to complete and send to be cleaned up.
499  */
500 static void exit_hash_lock(struct data_vio *data_vio)
501 {
502         /* Release the hash lock now, saving a thread transition in cleanup. */
503         vdo_release_hash_lock(data_vio);
504
505         /* Complete the data_vio and start the clean-up path to release any locks it still holds. */
506         data_vio->vio.completion.callback = complete_data_vio;
507
508         continue_data_vio(data_vio);
509 }
510
511 /**
512  * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
513  *                            is_duplicate and duplicate fields from a zoned_pbn.
514  * @data_vio: The data_vio to modify.
515  * @source: The location of the duplicate.
516  */
517 static void set_duplicate_location(struct data_vio *data_vio,
518                                    const struct zoned_pbn source)
519 {
520         data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
521         data_vio->duplicate = source;
522 }
523
524 /**
525  * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
526  *                       make the retired agent exit the hash lock.
527  * @lock: The hash lock to update.
528  *
529  * Return: The new lock agent (which will be NULL if there was no waiter)
530  */
531 static struct data_vio *retire_lock_agent(struct hash_lock *lock)
532 {
533         struct data_vio *old_agent = lock->agent;
534         struct data_vio *new_agent = dequeue_lock_waiter(lock);
535
536         lock->agent = new_agent;
537         exit_hash_lock(old_agent);
538         if (new_agent != NULL)
539                 set_duplicate_location(new_agent, lock->duplicate);
540         return new_agent;
541 }
542
543 /**
544  * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
545  * @lock: The hash lock on which to wait.
546  * @data_vio: The data_vio to add to the queue.
547  */
548 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
549 {
550         vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
551
552         /*
553          * Make sure the agent doesn't block indefinitely in the packer since it now has at least
554          * one other data_vio waiting on it.
555          */
556         if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
557                 return;
558
559         /*
560          * Even though we're waiting, we also have to send ourselves as a one-way message to the
561          * packer to ensure the agent continues executing. This is safe because
562          * cancel_vio_compression() guarantees the agent won't continue executing until this
563          * message arrives in the packer, and because the wait queue link isn't used for sending
564          * the message.
565          */
566         data_vio->compression.lock_holder = lock->agent;
567         launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
568 }
569
570 /**
571  * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
572  *                  optimization.
573  * @waiter: The data_vio's waiter link.
574  * @context: Not used.
575  */
576 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
577 {
578         write_data_vio(vdo_waiter_as_data_vio(waiter));
579 }
580
581 /**
582  * start_bypassing() - Stop using the hash lock.
583  * @lock: The hash lock.
584  * @agent: The data_vio acting as the agent for the lock.
585  *
586  * Stops using the hash lock. This is the final transition for hash locks which did not get an
587  * error.
588  */
589 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
590 {
591         lock->state = VDO_HASH_LOCK_BYPASSING;
592         exit_hash_lock(agent);
593 }
594
595 void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
596 {
597         struct hash_lock *lock = data_vio->hash_lock;
598
599         if (lock->state == VDO_HASH_LOCK_BYPASSING) {
600                 exit_hash_lock(data_vio);
601                 return;
602         }
603
604         if (lock->agent == NULL) {
605                 lock->agent = data_vio;
606         } else if (data_vio != lock->agent) {
607                 exit_hash_lock(data_vio);
608                 return;
609         }
610
611         lock->state = VDO_HASH_LOCK_BYPASSING;
612
613         /* Ensure we don't attempt to update advice when cleaning up. */
614         lock->update_advice = false;
615
616         vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
617
618         if (lock->duplicate_lock != NULL) {
619                 /* The agent must reference the duplicate zone to launch it. */
620                 data_vio->duplicate = lock->duplicate;
621                 launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
622                 return;
623         }
624
625         lock->agent = NULL;
626         data_vio->is_duplicate = false;
627         exit_hash_lock(data_vio);
628 }
629
630 /**
631  * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
632  *                      duplicate candidate.
633  * @completion: The completion of the data_vio acting as the lock's agent.
634  *
635  * This continuation is registered in unlock_duplicate_pbn().
636  */
637 static void finish_unlocking(struct vdo_completion *completion)
638 {
639         struct data_vio *agent = as_data_vio(completion);
640         struct hash_lock *lock = agent->hash_lock;
641
642         assert_hash_lock_agent(agent, __func__);
643
644         ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
645                         "must have released the duplicate lock for the hash lock");
646
647         if (!lock->verified) {
648                 /*
649                  * UNLOCKING -> WRITING transition: The lock we released was on an unverified
650                  * block, so it must have been a lock on advice we were verifying, not on a
651                  * location that was used for deduplication. Go write (or compress) the block to
652                  * get a location to dedupe against.
653                  */
654                 start_writing(lock, agent);
655                 return;
656         }
657
658         /*
659          * With the lock released, the verified duplicate block may already have changed and will
660          * need to be re-verified if a waiter arrived.
661          */
662         lock->verified = false;
663
664         if (vdo_waitq_has_waiters(&lock->waiters)) {
665                 /*
666                  * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
667                  * agent was releasing the PBN lock. The current agent exits and the waiter has to
668                  * re-lock and re-verify the duplicate location.
669                  *
670                  * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
671                  * to re-verify.
672                  */
673                 agent = retire_lock_agent(lock);
674                 start_locking(lock, agent);
675                 return;
676         }
677
678         /*
679          * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
680          * data_vios reference it, so remove it from the lock map and return it to the pool.
681          */
682         start_bypassing(lock, agent);
683 }
684
685 /**
686  * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
687  *                          contained duplicate data.
688  * @completion: The completion of the data_vio acting as the lock's agent.
689  *
690  * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
691  * hash zone thread.
692  */
693 static void unlock_duplicate_pbn(struct vdo_completion *completion)
694 {
695         struct data_vio *agent = as_data_vio(completion);
696         struct hash_lock *lock = agent->hash_lock;
697
698         assert_data_vio_in_duplicate_zone(agent);
699         ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
700                         "must have a duplicate lock to release");
701
702         vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
703                                            uds_forget(lock->duplicate_lock));
704         if (lock->state == VDO_HASH_LOCK_BYPASSING) {
705                 complete_data_vio(completion);
706                 return;
707         }
708
709         launch_data_vio_hash_zone_callback(agent, finish_unlocking);
710 }
711
712 /**
713  * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
714  *                     contained duplicate data.
715  * @lock: The hash lock.
716  * @agent: The data_vio currently acting as the agent for the lock.
717  */
718 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
719 {
720         lock->state = VDO_HASH_LOCK_UNLOCKING;
721         launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
722 }
723
724 static void release_context(struct dedupe_context *context)
725 {
726         struct hash_zone *zone = context->zone;
727
728         WRITE_ONCE(zone->active, zone->active - 1);
729         list_move(&context->list_entry, &zone->available);
730 }
731
732 static void process_update_result(struct data_vio *agent)
733 {
734         struct dedupe_context *context = agent->dedupe_context;
735
736         if ((context == NULL) ||
737             !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
738                 return;
739
740         release_context(context);
741 }
742
743 /**
744  * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
745  * @completion: The completion of the data_vio that performed the update
746  *
747  * This continuation is registered in start_querying().
748  */
749 static void finish_updating(struct vdo_completion *completion)
750 {
751         struct data_vio *agent = as_data_vio(completion);
752         struct hash_lock *lock = agent->hash_lock;
753
754         assert_hash_lock_agent(agent, __func__);
755
756         process_update_result(agent);
757
758         /*
759          * UDS was updated successfully, so don't update again unless the duplicate location
760          * changes due to rollover.
761          */
762         lock->update_advice = false;
763
764         if (vdo_waitq_has_waiters(&lock->waiters)) {
765                 /*
766                  * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
767                  * Send it on the verified dedupe path. The agent is done with the lock, but the
768                  * lock may still need to use it to clean up after rollover.
769                  */
770                 start_deduping(lock, agent, true);
771                 return;
772         }
773
774         if (lock->duplicate_lock != NULL) {
775                 /*
776                  * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
777                  * duplicate PBN lock, so go release it.
778                  */
779                 start_unlocking(lock, agent);
780                 return;
781         }
782
783         /*
784          * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
785          * release.
786          */
787         start_bypassing(lock, agent);
788 }
789
790 static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
791
792 /**
793  * start_updating() - Continue deduplication with the last step, updating UDS with the location of
794  *                    the duplicate that should be returned as advice in the future.
795  * @lock: The hash lock.
796  * @agent: The data_vio currently acting as the agent for the lock.
797  */
798 static void start_updating(struct hash_lock *lock, struct data_vio *agent)
799 {
800         lock->state = VDO_HASH_LOCK_UPDATING;
801
802         ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
803         ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
804
805         agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
806         set_data_vio_hash_zone_callback(agent, finish_updating);
807         query_index(agent, UDS_UPDATE);
808 }
809
810 /**
811  * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
812  *                     by the hash lock.
813  * @lock: The hash lock.
814  * @data_vio: The lock holder that has finished deduplicating.
815  *
816  * If there are other data_vios still sharing the lock, this will just release the data_vio's share
817  * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
818  * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
819  * eventually be released.
820  */
821 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
822 {
823         struct data_vio *agent = data_vio;
824
825         ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
826         ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
827                         "shouldn't have any lock waiters in DEDUPING");
828
829         /* Just release the lock reference if other data_vios are still deduping. */
830         if (lock->reference_count > 1) {
831                 exit_hash_lock(data_vio);
832                 return;
833         }
834
835         /* The hash lock must have an agent for all other lock states. */
836         lock->agent = agent;
837         if (lock->update_advice) {
838                 /*
839                  * DEDUPING -> UPDATING transition: The location of the duplicate block changed
840                  * since the initial UDS query because of compression, rollover, or because the
841                  * query agent didn't have an allocation. The UDS update was delayed in case there
842                  * was another change in location, but with only this data_vio using the hash lock,
843                  * it's time to update the advice.
844                  */
845                 start_updating(lock, agent);
846         } else {
847                 /*
848                  * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
849                  * location so the hash lock itself can be released (contingent on no new data_vios
850                  * arriving in the lock before the agent returns).
851                  */
852                 start_unlocking(lock, agent);
853         }
854 }
855
856 /**
857  * acquire_lock() - Get the lock for a record name.
858  * @zone: The zone responsible for the hash.
859  * @hash: The hash to lock.
860  * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
861  *                the new lock.
862  * @lock_ptr: A pointer to receive the hash lock.
863  *
864  * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
865  * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
866  * zone. This must only be called in the correct thread for the zone.
867  *
868  * Return: VDO_SUCCESS or an error code.
869  */
870 static int __must_check acquire_lock(struct hash_zone *zone,
871                                      const struct uds_record_name *hash,
872                                      struct hash_lock *replace_lock,
873                                      struct hash_lock **lock_ptr)
874 {
875         struct hash_lock *lock, *new_lock;
876         int result;
877
878         /*
879          * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
880          * in the common case of no lock contention.
881          */
882         result = ASSERT(!list_empty(&zone->lock_pool),
883                         "never need to wait for a free hash lock");
884         if (result != VDO_SUCCESS)
885                 return result;
886
887         new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
888         list_del_init(&new_lock->pool_node);
889
890         /*
891          * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
892          * map key.
893          */
894         new_lock->hash = *hash;
895
896         result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
897                                  new_lock, (replace_lock != NULL), (void **) &lock);
898         if (result != VDO_SUCCESS) {
899                 return_hash_lock_to_pool(zone, uds_forget(new_lock));
900                 return result;
901         }
902
903         if (replace_lock != NULL) {
904                 /* On mismatch put the old lock back and return a severe error */
905                 ASSERT_LOG_ONLY(lock == replace_lock,
906                                 "old lock must have been in the lock map");
907                 /* TODO: Check earlier and bail out? */
908                 ASSERT_LOG_ONLY(replace_lock->registered,
909                                 "old lock must have been marked registered");
910                 replace_lock->registered = false;
911         }
912
913         if (lock == replace_lock) {
914                 lock = new_lock;
915                 lock->registered = true;
916         } else {
917                 /* There's already a lock for the hash, so we don't need the borrowed lock. */
918                 return_hash_lock_to_pool(zone, uds_forget(new_lock));
919         }
920
921         *lock_ptr = lock;
922         return VDO_SUCCESS;
923 }
924
925 /**
926  * enter_forked_lock() - Bind the data_vio to a new hash lock.
927  *
928  * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
929  * on that lock.
930  */
931 static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
932 {
933         struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
934         struct hash_lock *new_lock = context;
935
936         set_hash_lock(data_vio, new_lock);
937         wait_on_hash_lock(new_lock, data_vio);
938 }
939
940 /**
941  * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
942  * @old_lock: The hash lock to fork.
943  * @new_agent: The data_vio that will be the agent for the new lock.
944  *
945  * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
946  * of the old lock in the lock map. The old lock remains active, but will not update advice.
947  */
948 static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
949 {
950         struct hash_lock *new_lock;
951         int result;
952
953         result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
954                               &new_lock);
955         if (result != VDO_SUCCESS) {
956                 continue_data_vio_with_error(new_agent, result);
957                 return;
958         }
959
960         /*
961          * Only one of the two locks should update UDS. The old lock is out of references, so it
962          * would be poor dedupe advice in the short term.
963          */
964         old_lock->update_advice = false;
965         new_lock->update_advice = true;
966
967         set_hash_lock(new_agent, new_lock);
968         new_lock->agent = new_agent;
969
970         vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
971
972         new_agent->is_duplicate = false;
973         start_writing(new_lock, new_agent);
974 }
975
976 /**
977  * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
978  *                   path.
979  * @lock: The hash lock.
980  * @data_vio: The data_vio to deduplicate using the hash lock.
981  * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
982  *
983  * If no increments are available, this will roll over to a new hash lock and launch the data_vio
984  * as the writing agent for that lock.
985  */
986 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
987                           bool has_claim)
988 {
989         if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
990                 /* Out of increments, so must roll over to a new lock. */
991                 fork_hash_lock(lock, data_vio);
992                 return;
993         }
994
995         /* Deduplicate against the lock's verified location. */
996         set_duplicate_location(data_vio, lock->duplicate);
997         data_vio->new_mapped = data_vio->duplicate;
998         update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
999 }
1000
1001 /**
1002  * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
1003  *                    true copy of their data on disk.
1004  * @lock: The hash lock.
1005  * @agent: The data_vio acting as the agent for the lock.
1006  * @agent_is_done: true only if the agent has already written or deduplicated against its data.
1007  *
1008  * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1009  * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1010  */
1011 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1012                            bool agent_is_done)
1013 {
1014         lock->state = VDO_HASH_LOCK_DEDUPING;
1015
1016         /*
1017          * We don't take the downgraded allocation lock from the agent unless we actually need to
1018          * deduplicate against it.
1019          */
1020         if (lock->duplicate_lock == NULL) {
1021                 ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1022                                 "compression must have shared a lock");
1023                 ASSERT_LOG_ONLY(agent_is_done,
1024                                 "agent must have written the new duplicate");
1025                 transfer_allocation_lock(agent);
1026         }
1027
1028         ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1029                         "duplicate_lock must be a PBN read lock");
1030
1031         /*
1032          * This state is not like any of the other states. There is no designated agent--the agent
1033          * transitioning to this state and all the waiters will be launched to deduplicate in
1034          * parallel.
1035          */
1036         lock->agent = NULL;
1037
1038         /*
1039          * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1040          * available increments for on the dedupe path. If we run out of increments, rollover will
1041          * be triggered and the remaining waiters will be transferred to the new lock.
1042          */
1043         if (!agent_is_done) {
1044                 launch_dedupe(lock, agent, true);
1045                 agent = NULL;
1046         }
1047         while (vdo_waitq_has_waiters(&lock->waiters))
1048                 launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1049
1050         if (agent_is_done) {
1051                 /*
1052                  * In the degenerate case where all the waiters rolled over to a new lock, this
1053                  * will continue to use the old agent to clean up this lock, and otherwise it just
1054                  * lets the agent exit the lock.
1055                  */
1056                 finish_deduping(lock, agent);
1057         }
1058 }
1059
1060 /**
1061  * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1062  * @stat: The statistic field to increment.
1063  */
1064 static void increment_stat(u64 *stat)
1065 {
1066         /*
1067          * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1068          * affecting other threads reading stats.
1069          */
1070         WRITE_ONCE(*stat, *stat + 1);
1071 }
1072
1073 /**
1074  * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1075  *                      duplicate candidate.
1076  * @completion: The completion of the data_vio used to verify dedupe
1077  *
1078  * This continuation is registered in start_verifying().
1079  */
1080 static void finish_verifying(struct vdo_completion *completion)
1081 {
1082         struct data_vio *agent = as_data_vio(completion);
1083         struct hash_lock *lock = agent->hash_lock;
1084
1085         assert_hash_lock_agent(agent, __func__);
1086
1087         lock->verified = agent->is_duplicate;
1088
1089         /*
1090          * Only count the result of the initial verification of the advice as valid or stale, and
1091          * not any re-verifications due to PBN lock releases.
1092          */
1093         if (!lock->verify_counted) {
1094                 lock->verify_counted = true;
1095                 if (lock->verified)
1096                         increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1097                 else
1098                         increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1099         }
1100
1101         /*
1102          * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1103          * claim a reference count increment for the agent.
1104          */
1105         if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1106                 agent->is_duplicate = false;
1107                 lock->verified = false;
1108         }
1109
1110         if (lock->verified) {
1111                 /*
1112                  * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1113                  * deduplicating against it, if references are available.
1114                  */
1115                 start_deduping(lock, agent, false);
1116         } else {
1117                 /*
1118                  * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1119                  * dedupe and roll over immediately, which would fail because it would leave the
1120                  * lock without an agent to release the PBN lock. In both cases, the data will have
1121                  * to be written or compressed, but first the advice PBN must be unlocked by the
1122                  * VERIFYING agent.
1123                  */
1124                 lock->update_advice = true;
1125                 start_unlocking(lock, agent);
1126         }
1127 }
1128
1129 static bool blocks_equal(char *block1, char *block2)
1130 {
1131         int i;
1132
1133
1134         for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1135                 if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1136                         return false;
1137         }
1138
1139         return true;
1140 }
1141
1142 static void verify_callback(struct vdo_completion *completion)
1143 {
1144         struct data_vio *agent = as_data_vio(completion);
1145
1146         agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1147         launch_data_vio_hash_zone_callback(agent, finish_verifying);
1148 }
1149
1150 static void uncompress_and_verify(struct vdo_completion *completion)
1151 {
1152         struct data_vio *agent = as_data_vio(completion);
1153         int result;
1154
1155         result = uncompress_data_vio(agent, agent->duplicate.state,
1156                                      agent->scratch_block);
1157         if (result == VDO_SUCCESS) {
1158                 verify_callback(completion);
1159                 return;
1160         }
1161
1162         agent->is_duplicate = false;
1163         launch_data_vio_hash_zone_callback(agent, finish_verifying);
1164 }
1165
1166 static void verify_endio(struct bio *bio)
1167 {
1168         struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1169         int result = blk_status_to_errno(bio->bi_status);
1170
1171         vdo_count_completed_bios(bio);
1172         if (result != VDO_SUCCESS) {
1173                 agent->is_duplicate = false;
1174                 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1175                 return;
1176         }
1177
1178         if (vdo_is_state_compressed(agent->duplicate.state)) {
1179                 launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1180                                              CPU_Q_COMPRESS_BLOCK_PRIORITY);
1181                 return;
1182         }
1183
1184         launch_data_vio_cpu_callback(agent, verify_callback,
1185                                      CPU_Q_COMPLETE_READ_PRIORITY);
1186 }
1187
1188 /**
1189  * start_verifying() - Begin the data verification phase.
1190  * @lock: The hash lock (must be LOCKING).
1191  * @agent: The data_vio to use to read and compare candidate data.
1192  *
1193  * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1194  * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1195  * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1196  * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1197  * dedupe.
1198  */
1199 static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1200 {
1201         int result;
1202         struct vio *vio = &agent->vio;
1203         char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1204                         (char *) agent->compression.block :
1205                         agent->scratch_block);
1206
1207         lock->state = VDO_HASH_LOCK_VERIFYING;
1208         ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1209
1210         agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1211         result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1212                                agent->duplicate.pbn);
1213         if (result != VDO_SUCCESS) {
1214                 set_data_vio_hash_zone_callback(agent, finish_verifying);
1215                 continue_data_vio_with_error(agent, result);
1216                 return;
1217         }
1218
1219         set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1220         vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1221 }
1222
1223 /**
1224  * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1225  *                    lock on the candidate duplicate block.
1226  * @completion: The completion of the data_vio that attempted to get the read lock.
1227  *
1228  * This continuation is registered in lock_duplicate_pbn().
1229  */
1230 static void finish_locking(struct vdo_completion *completion)
1231 {
1232         struct data_vio *agent = as_data_vio(completion);
1233         struct hash_lock *lock = agent->hash_lock;
1234
1235         assert_hash_lock_agent(agent, __func__);
1236
1237         if (!agent->is_duplicate) {
1238                 ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1239                                 "must not hold duplicate_lock if not flagged as a duplicate");
1240                 /*
1241                  * LOCKING -> WRITING transition: The advice block is being modified or has no
1242                  * available references, so try to write or compress the data, remembering to
1243                  * update UDS later with the new advice.
1244                  */
1245                 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1246                 lock->update_advice = true;
1247                 start_writing(lock, agent);
1248                 return;
1249         }
1250
1251         ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1252                         "must hold duplicate_lock if flagged as a duplicate");
1253
1254         if (!lock->verified) {
1255                 /*
1256                  * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1257                  * the candidate duplicate and comparing it to the agent's data to decide whether
1258                  * it is a true duplicate or stale advice.
1259                  */
1260                 start_verifying(lock, agent);
1261                 return;
1262         }
1263
1264         if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1265                 /*
1266                  * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1267                  * available increments left. Must first release the useless PBN read lock before
1268                  * rolling over to a new copy of the block.
1269                  */
1270                 agent->is_duplicate = false;
1271                 lock->verified = false;
1272                 lock->update_advice = true;
1273                 start_unlocking(lock, agent);
1274                 return;
1275         }
1276
1277         /*
1278          * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1279          * against a location that was previously verified or written to.
1280          */
1281         start_deduping(lock, agent, false);
1282 }
1283
1284 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1285                                           struct slab_depot *depot)
1286 {
1287         /* Ensure that the newly-locked block is referenced. */
1288         struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1289         int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1290
1291         if (result == VDO_SUCCESS)
1292                 return true;
1293
1294         uds_log_warning_strerror(result,
1295                                  "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1296         agent->is_duplicate = false;
1297         vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1298                                            agent->duplicate.pbn, lock);
1299         continue_data_vio_with_error(agent, result);
1300         return false;
1301 }
1302
1303 /**
1304  * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1305  *                        duplicate data (compressed or uncompressed).
1306  * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1307  *              behalf of its hash lock.
1308  *
1309  * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1310  * cleared before calling back. this continuation is launched from start_locking(), and calls back
1311  * to finish_locking() on the hash zone thread.
1312  */
1313 static void lock_duplicate_pbn(struct vdo_completion *completion)
1314 {
1315         unsigned int increment_limit;
1316         struct pbn_lock *lock;
1317         int result;
1318
1319         struct data_vio *agent = as_data_vio(completion);
1320         struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1321         struct physical_zone *zone = agent->duplicate.zone;
1322
1323         assert_data_vio_in_duplicate_zone(agent);
1324
1325         set_data_vio_hash_zone_callback(agent, finish_locking);
1326
1327         /*
1328          * While in the zone that owns it, find out how many additional references can be made to
1329          * the block if it turns out to truly be a duplicate.
1330          */
1331         increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1332         if (increment_limit == 0) {
1333                 /*
1334                  * We could deduplicate against it later if a reference happened to be released
1335                  * during verification, but it's probably better to bail out now.
1336                  */
1337                 agent->is_duplicate = false;
1338                 continue_data_vio(agent);
1339                 return;
1340         }
1341
1342         result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1343                                                     VIO_READ_LOCK, &lock);
1344         if (result != VDO_SUCCESS) {
1345                 continue_data_vio_with_error(agent, result);
1346                 return;
1347         }
1348
1349         if (!vdo_is_pbn_read_lock(lock)) {
1350                 /*
1351                  * There are three cases of write locks: uncompressed data block writes, compressed
1352                  * (packed) block writes, and block map page writes. In all three cases, we give up
1353                  * on trying to verify the advice and don't bother to try deduplicate against the
1354                  * data in the write lock holder.
1355                  *
1356                  * 1) We don't ever want to try to deduplicate against a block map page.
1357                  *
1358                  * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1359                  * because of the chance of matching it, and because we don't record advice for it,
1360                  * but for the uncompressed representation of all the fragments it contains. The
1361                  * only way we'd be getting lock contention is if we've written the same
1362                  * representation coincidentally before, had it become unreferenced, and it just
1363                  * happened to be packed together from compressed writes when we go to verify the
1364                  * lucky advice. Giving up is a minuscule loss of potential dedupe.
1365                  *
1366                  * 2b) If the advice is for a slot of a compressed block, it's about to get
1367                  * smashed, and the write smashing it cannot contain our data--it would have to be
1368                  * writing on behalf of our hash lock, but that's impossible since we're the lock
1369                  * agent.
1370                  *
1371                  * 3a) If the lock is held by a data_vio with different data, the advice is already
1372                  * stale or is about to become stale.
1373                  *
1374                  * 3b) If the lock is held by a data_vio that matches us, we may as well either
1375                  * write it ourselves (or reference the copy we already wrote) instead of
1376                  * potentially having many duplicates wait for the lock holder to write, journal,
1377                  * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1378                  * update in the very rare case of advice for a free block that just happened to be
1379                  * allocated to a data_vio with the same hash. There's also a chance to save on a
1380                  * block write, at the cost of a block verify. Saving on a full block compare in
1381                  * all stale advice cases almost certainly outweighs saving a UDS update and
1382                  * trading a write for a read in a lucky case where advice would have been saved
1383                  * from becoming stale.
1384                  */
1385                 agent->is_duplicate = false;
1386                 continue_data_vio(agent);
1387                 return;
1388         }
1389
1390         if (lock->holder_count == 0) {
1391                 if (!acquire_provisional_reference(agent, lock, depot))
1392                         return;
1393
1394                 /*
1395                  * The increment limit we grabbed earlier is still valid. The lock now holds the
1396                  * rights to acquire all those references. Those rights will be claimed by hash
1397                  * locks sharing this read lock.
1398                  */
1399                 lock->increment_limit = increment_limit;
1400         }
1401
1402         /*
1403          * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1404          */
1405         set_duplicate_lock(agent->hash_lock, lock);
1406
1407         /*
1408          * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1409          * thread.
1410          */
1411         continue_data_vio(agent);
1412 }
1413
1414 /**
1415  * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1416  *                   potential duplicate through its agent.
1417  * @lock: The hash lock (currently must be QUERYING).
1418  * @agent: The data_vio bearing the dedupe advice.
1419  */
1420 static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1421 {
1422         ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1423                         "must not acquire a duplicate lock when already holding it");
1424
1425         lock->state = VDO_HASH_LOCK_LOCKING;
1426
1427         /*
1428          * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1429          * accepting the advice, and don't explicitly change lock states (or use an agent-local
1430          * state, or an atomic), we can avoid a thread transition here.
1431          */
1432         agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1433         launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1434 }
1435
1436 /**
1437  * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1438  *                    compressing its copy of the data block.
1439  * @lock: The hash lock, which must be in state WRITING.
1440  * @agent: The data_vio that wrote its data for the lock.
1441  *
1442  * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1443  * may not be finished with it, as a UDS update might still be needed.
1444  *
1445  * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1446  * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1447  * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1448  * UDS.
1449  */
1450 static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1451 {
1452         /*
1453          * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1454          * the write succeeded, there's no need to verify it.
1455          */
1456         lock->duplicate = agent->new_mapped;
1457         lock->verified = true;
1458
1459         if (vdo_is_state_compressed(lock->duplicate.state) &&
1460             lock->registered)
1461                 /*
1462                  * Compression means the location we gave in the UDS query is not the location
1463                  * we're using to deduplicate.
1464                  */
1465                 lock->update_advice = true;
1466
1467         /* If there are any waiters, we need to start deduping them. */
1468         if (vdo_waitq_has_waiters(&lock->waiters)) {
1469                 /*
1470                  * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1471                  * compress, so the PBN lock on the written copy was already transferred. The agent
1472                  * is done with the lock, but the lock may still need to use it to clean up after
1473                  * rollover.
1474                  */
1475                 start_deduping(lock, agent, true);
1476                 return;
1477         }
1478
1479         /*
1480          * There are no waiters and the agent has successfully written, so take a step towards
1481          * being able to release the hash lock (or just release it).
1482          */
1483         if (lock->update_advice) {
1484                 /*
1485                  * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1486                  * retain the WRITING agent and use it to launch the update. The happens on
1487                  * compression, rollover, or the QUERYING agent not having an allocation.
1488                  */
1489                 start_updating(lock, agent);
1490         } else if (lock->duplicate_lock != NULL) {
1491                 /*
1492                  * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1493                  * compressed write gave us a shared duplicate lock that we must release.
1494                  */
1495                 set_duplicate_location(agent, lock->duplicate);
1496                 start_unlocking(lock, agent);
1497         } else {
1498                 /*
1499                  * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1500                  * duplicate lock held, so both the agent and lock have no more work to do. The
1501                  * agent will release its allocation lock in cleanup.
1502                  */
1503                 start_bypassing(lock, agent);
1504         }
1505 }
1506
1507 /**
1508  * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1509  * @lock: The hash lock to modify.
1510  *
1511  * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1512  * return the new agent. Otherwise, just return the current agent.
1513  */
1514 static struct data_vio *select_writing_agent(struct hash_lock *lock)
1515 {
1516         struct vdo_wait_queue temp_queue;
1517         struct data_vio *data_vio;
1518
1519         vdo_waitq_init(&temp_queue);
1520
1521         /*
1522          * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1523          * search, but it only happens when nearly out of space.
1524          */
1525         while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1526                !data_vio_has_allocation(data_vio)) {
1527                 /* Use the lower-level enqueue since we're just moving waiters around. */
1528                 vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1529         }
1530
1531         if (data_vio != NULL) {
1532                 /*
1533                  * Move the rest of the waiters over to the temp queue, preserving the order they
1534                  * arrived at the lock.
1535                  */
1536                 vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1537
1538                 /*
1539                  * The current agent is being replaced and will have to wait to dedupe; make it the
1540                  * first waiter since it was the first to reach the lock.
1541                  */
1542                 vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1543                 lock->agent = data_vio;
1544         } else {
1545                 /* No one has an allocation, so keep the current agent. */
1546                 data_vio = lock->agent;
1547         }
1548
1549         /* Swap all the waiters back onto the lock's queue. */
1550         vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1551         return data_vio;
1552 }
1553
1554 /**
1555  * start_writing() - Begin the non-duplicate write path.
1556  * @lock: The hash lock (currently must be QUERYING).
1557  * @agent: The data_vio currently acting as the agent for the lock.
1558  *
1559  * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1560  * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1561  * path.
1562  */
1563 static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1564 {
1565         lock->state = VDO_HASH_LOCK_WRITING;
1566
1567         /*
1568          * The agent might not have received an allocation and so can't be used for writing, but
1569          * it's entirely possible that one of the waiters did.
1570          */
1571         if (!data_vio_has_allocation(agent)) {
1572                 agent = select_writing_agent(lock);
1573                 /* If none of the waiters had an allocation, the writes all have to fail. */
1574                 if (!data_vio_has_allocation(agent)) {
1575                         /*
1576                          * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1577                          * fail immediately if they don't have an allocation? It might be possible
1578                          * that on some path there would be non-waiters still referencing the lock,
1579                          * so it would remain in the map as everything is currently spelled, even
1580                          * if the agent and all waiters release.
1581                          */
1582                         continue_data_vio_with_error(agent, VDO_NO_SPACE);
1583                         return;
1584                 }
1585         }
1586
1587         /*
1588          * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1589          * there are any other data_vios waiting.
1590          */
1591         if (vdo_waitq_has_waiters(&lock->waiters))
1592                 cancel_data_vio_compression(agent);
1593
1594         /*
1595          * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1596          * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1597          */
1598         launch_compress_data_vio(agent);
1599 }
1600
1601 /*
1602  * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1603  * Returns true if valid advice was found and decoded
1604  */
1605 static bool decode_uds_advice(struct dedupe_context *context)
1606 {
1607         const struct uds_request *request = &context->request;
1608         struct data_vio *data_vio = context->requestor;
1609         size_t offset = 0;
1610         const struct uds_record_data *encoding = &request->old_metadata;
1611         struct vdo *vdo = vdo_from_data_vio(data_vio);
1612         struct zoned_pbn *advice = &data_vio->duplicate;
1613         u8 version;
1614         int result;
1615
1616         if ((request->status != UDS_SUCCESS) || !request->found)
1617                 return false;
1618
1619         version = encoding->data[offset++];
1620         if (version != UDS_ADVICE_VERSION) {
1621                 uds_log_error("invalid UDS advice version code %u", version);
1622                 return false;
1623         }
1624
1625         advice->state = encoding->data[offset++];
1626         advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1627         offset += sizeof(u64);
1628         BUG_ON(offset != UDS_ADVICE_SIZE);
1629
1630         /* Don't use advice that's clearly meaningless. */
1631         if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1632                 uds_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1633                               (unsigned long long) advice->pbn, advice->state,
1634                               (unsigned long long) data_vio->logical.lbn);
1635                 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1636                 return false;
1637         }
1638
1639         result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1640         if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1641                 uds_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1642                               (unsigned long long) advice->pbn,
1643                               (unsigned long long) data_vio->logical.lbn);
1644                 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1645                 return false;
1646         }
1647
1648         return true;
1649 }
1650
1651 static void process_query_result(struct data_vio *agent)
1652 {
1653         struct dedupe_context *context = agent->dedupe_context;
1654
1655         if (context == NULL)
1656                 return;
1657
1658         if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1659                 agent->is_duplicate = decode_uds_advice(context);
1660                 release_context(context);
1661         }
1662 }
1663
1664 /**
1665  * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1666  * @completion: The completion of the data_vio that performed the query.
1667  *
1668  * This continuation is registered in start_querying().
1669  */
1670 static void finish_querying(struct vdo_completion *completion)
1671 {
1672         struct data_vio *agent = as_data_vio(completion);
1673         struct hash_lock *lock = agent->hash_lock;
1674
1675         assert_hash_lock_agent(agent, __func__);
1676
1677         process_query_result(agent);
1678
1679         if (agent->is_duplicate) {
1680                 lock->duplicate = agent->duplicate;
1681                 /*
1682                  * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1683                  * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1684                  * that the advice can be used.
1685                  */
1686                 start_locking(lock, agent);
1687         } else {
1688                 /*
1689                  * The agent will be used as the duplicate if has an allocation; if it does, that
1690                  * location was posted to UDS, so no update will be needed.
1691                  */
1692                 lock->update_advice = !data_vio_has_allocation(agent);
1693                 /*
1694                  * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1695                  * so try to write or compress the data.
1696                  */
1697                 start_writing(lock, agent);
1698         }
1699 }
1700
1701 /**
1702  * start_querying() - Start deduplication for a hash lock.
1703  * @lock: The initialized hash lock.
1704  * @data_vio: The data_vio that has just obtained the new lock.
1705  *
1706  * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1707  * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1708  * query on behalf of the lock.
1709  */
1710 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1711 {
1712         lock->agent = data_vio;
1713         lock->state = VDO_HASH_LOCK_QUERYING;
1714         data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1715         set_data_vio_hash_zone_callback(data_vio, finish_querying);
1716         query_index(data_vio,
1717                     (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1718 }
1719
1720 /**
1721  * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1722  *                             unimplemented or unusable state and continue the data_vio with an
1723  *                             error.
1724  * @lock: The hash lock.
1725  * @data_vio: The data_vio attempting to enter the lock.
1726  */
1727 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1728 {
1729         ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1730                         get_hash_lock_state_name(lock->state));
1731         continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1732 }
1733
1734 /**
1735  * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1736  *                            deduplicating.
1737  * @data_vio: The data_vio to continue processing in its hash lock.
1738  *
1739  * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1740  * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1741  * lock, or update the UDS index, or simply release its share of the lock.
1742  *
1743  * Context: This must only be called in the correct thread for the hash zone.
1744  */
1745 void vdo_continue_hash_lock(struct vdo_completion *completion)
1746 {
1747         struct data_vio *data_vio = as_data_vio(completion);
1748         struct hash_lock *lock = data_vio->hash_lock;
1749
1750         switch (lock->state) {
1751         case VDO_HASH_LOCK_WRITING:
1752                 ASSERT_LOG_ONLY(data_vio == lock->agent,
1753                                 "only the lock agent may continue the lock");
1754                 finish_writing(lock, data_vio);
1755                 break;
1756
1757         case VDO_HASH_LOCK_DEDUPING:
1758                 finish_deduping(lock, data_vio);
1759                 break;
1760
1761         case VDO_HASH_LOCK_BYPASSING:
1762                 /* This data_vio has finished the write path and the lock doesn't need it. */
1763                 exit_hash_lock(data_vio);
1764                 break;
1765
1766         case VDO_HASH_LOCK_INITIALIZING:
1767         case VDO_HASH_LOCK_QUERYING:
1768         case VDO_HASH_LOCK_UPDATING:
1769         case VDO_HASH_LOCK_LOCKING:
1770         case VDO_HASH_LOCK_VERIFYING:
1771         case VDO_HASH_LOCK_UNLOCKING:
1772                 /* A lock in this state should never be re-entered. */
1773                 report_bogus_lock_state(lock, data_vio);
1774                 break;
1775
1776         default:
1777                 report_bogus_lock_state(lock, data_vio);
1778         }
1779 }
1780
1781 /**
1782  * is_hash_collision() - Check to see if a hash collision has occurred.
1783  * @lock: The lock to check.
1784  * @candidate: The data_vio seeking to share the lock.
1785  *
1786  * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1787  * share the lock, which should only be possible in the extremely unlikely case of a hash
1788  * collision.
1789  *
1790  * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1791  *         as the lock holders.
1792  */
1793 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1794 {
1795         struct data_vio *lock_holder;
1796         struct hash_zone *zone;
1797         bool collides;
1798
1799         if (list_empty(&lock->duplicate_ring))
1800                 return false;
1801
1802         lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
1803                                        hash_lock_entry);
1804         zone = candidate->hash_zone;
1805         collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1806         if (collides)
1807                 increment_stat(&zone->statistics.concurrent_hash_collisions);
1808         else
1809                 increment_stat(&zone->statistics.concurrent_data_matches);
1810
1811         return collides;
1812 }
1813
1814 static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1815 {
1816         int result;
1817
1818         /* FIXME: BUG_ON() and/or enter read-only mode? */
1819         result = ASSERT(data_vio->hash_lock == NULL,
1820                         "must not already hold a hash lock");
1821         if (result != VDO_SUCCESS)
1822                 return result;
1823
1824         result = ASSERT(list_empty(&data_vio->hash_lock_entry),
1825                         "must not already be a member of a hash lock ring");
1826         if (result != VDO_SUCCESS)
1827                 return result;
1828
1829         return ASSERT(data_vio->recovery_sequence_number == 0,
1830                       "must not hold a recovery lock when getting a hash lock");
1831 }
1832
1833 /**
1834  * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1835  * @data_vio: The data_vio acquiring a lock on its record name.
1836  *
1837  * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1838  * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1839  * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1840  * a lock reference.
1841  */
1842 void vdo_acquire_hash_lock(struct vdo_completion *completion)
1843 {
1844         struct data_vio *data_vio = as_data_vio(completion);
1845         struct hash_lock *lock;
1846         int result;
1847
1848         assert_data_vio_in_hash_zone(data_vio);
1849
1850         result = assert_hash_lock_preconditions(data_vio);
1851         if (result != VDO_SUCCESS) {
1852                 continue_data_vio_with_error(data_vio, result);
1853                 return;
1854         }
1855
1856         result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1857         if (result != VDO_SUCCESS) {
1858                 continue_data_vio_with_error(data_vio, result);
1859                 return;
1860         }
1861
1862         if (is_hash_collision(lock, data_vio)) {
1863                 /*
1864                  * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1865                  * corruption. Bypass optimization entirely. We can't compress a data_vio without
1866                  * a hash_lock as the compressed write depends on the hash_lock to manage the
1867                  * references for the compressed block.
1868                  */
1869                 write_data_vio(data_vio);
1870                 return;
1871         }
1872
1873         set_hash_lock(data_vio, lock);
1874         switch (lock->state) {
1875         case VDO_HASH_LOCK_INITIALIZING:
1876                 start_querying(lock, data_vio);
1877                 return;
1878
1879         case VDO_HASH_LOCK_QUERYING:
1880         case VDO_HASH_LOCK_WRITING:
1881         case VDO_HASH_LOCK_UPDATING:
1882         case VDO_HASH_LOCK_LOCKING:
1883         case VDO_HASH_LOCK_VERIFYING:
1884         case VDO_HASH_LOCK_UNLOCKING:
1885                 /* The lock is busy, and can't be shared yet. */
1886                 wait_on_hash_lock(lock, data_vio);
1887                 return;
1888
1889         case VDO_HASH_LOCK_BYPASSING:
1890                 /* We can't use this lock, so bypass optimization entirely. */
1891                 vdo_release_hash_lock(data_vio);
1892                 write_data_vio(data_vio);
1893                 return;
1894
1895         case VDO_HASH_LOCK_DEDUPING:
1896                 launch_dedupe(lock, data_vio, false);
1897                 return;
1898
1899         default:
1900                 /* A lock in this state should not be acquired by new VIOs. */
1901                 report_bogus_lock_state(lock, data_vio);
1902         }
1903 }
1904
1905 /**
1906  * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1907  *                           data_vio's reference to it.
1908  * @data_vio: The data_vio releasing its hash lock.
1909  *
1910  * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1911  * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1912  * returns the lock to the hash zone's lock pool.
1913  *
1914  * Context: This must only be called in the correct thread for the hash zone.
1915  */
1916 void vdo_release_hash_lock(struct data_vio *data_vio)
1917 {
1918         u64 lock_key;
1919         struct hash_lock *lock = data_vio->hash_lock;
1920         struct hash_zone *zone = data_vio->hash_zone;
1921
1922         if (lock == NULL)
1923                 return;
1924
1925         set_hash_lock(data_vio, NULL);
1926
1927         if (lock->reference_count > 0) {
1928                 /* The lock is still in use by other data_vios. */
1929                 return;
1930         }
1931
1932         lock_key = hash_lock_key(lock);
1933         if (lock->registered) {
1934                 struct hash_lock *removed;
1935
1936                 removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1937                 ASSERT_LOG_ONLY(lock == removed,
1938                                 "hash lock being released must have been mapped");
1939         } else {
1940                 ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1941                                 "unregistered hash lock must not be in the lock map");
1942         }
1943
1944         ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1945                         "hash lock returned to zone must have no waiters");
1946         ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1947                         "hash lock returned to zone must not reference a PBN lock");
1948         ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1949                         "returned hash lock must not be in use with state %s",
1950                         get_hash_lock_state_name(lock->state));
1951         ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1952                         "hash lock returned to zone must not be in a pool ring");
1953         ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
1954                         "hash lock returned to zone must not reference DataVIOs");
1955
1956         return_hash_lock_to_pool(zone, lock);
1957 }
1958
1959 /**
1960  * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1961  *                              data_vio's hash lock, converting it to a duplicate PBN lock.
1962  * @data_vio: The data_vio holding the allocation lock to transfer.
1963  */
1964 static void transfer_allocation_lock(struct data_vio *data_vio)
1965 {
1966         struct allocation *allocation = &data_vio->allocation;
1967         struct hash_lock *hash_lock = data_vio->hash_lock;
1968
1969         ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1970                         "transferred lock must be for the block written");
1971
1972         allocation->pbn = VDO_ZERO_BLOCK;
1973
1974         ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1975                         "must have downgraded the allocation lock before transfer");
1976
1977         hash_lock->duplicate = data_vio->new_mapped;
1978         data_vio->duplicate = data_vio->new_mapped;
1979
1980         /*
1981          * Since the lock is being transferred, the holder count doesn't change (and isn't even
1982          * safe to examine on this thread).
1983          */
1984         hash_lock->duplicate_lock = uds_forget(allocation->lock);
1985 }
1986
1987 /**
1988  * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1989  *                                     on the compressed block to which its data was just written.
1990  * @data_vio: The data_vio which was just compressed.
1991  * @pbn_lock: The PBN lock on the compressed block.
1992  *
1993  * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1994  * read lock. This also reserves a reference count increment for the data_vio.
1995  */
1996 void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1997                                      struct pbn_lock *pbn_lock)
1998 {
1999         bool claimed;
2000
2001         ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
2002                         "a duplicate PBN lock should not exist when writing");
2003         ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
2004                         "lock transfer must be for a compressed write");
2005         assert_data_vio_in_new_mapped_zone(data_vio);
2006
2007         /* First sharer downgrades the lock. */
2008         if (!vdo_is_pbn_read_lock(pbn_lock))
2009                 vdo_downgrade_pbn_write_lock(pbn_lock, true);
2010
2011         /*
2012          * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2013          * has had a chance to journal a reference.
2014          */
2015         data_vio->duplicate = data_vio->new_mapped;
2016         data_vio->hash_lock->duplicate = data_vio->new_mapped;
2017         set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2018
2019         /*
2020          * Claim a reference for this data_vio. Necessary since another hash_lock might start
2021          * deduplicating against it before our incRef.
2022          */
2023         claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2024         ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2025 }
2026
2027 static void dedupe_kobj_release(struct kobject *directory)
2028 {
2029         uds_free(container_of(directory, struct hash_zones, dedupe_directory));
2030 }
2031
2032 static ssize_t dedupe_status_show(struct kobject *directory, struct attribute *attr,
2033                                   char *buf)
2034 {
2035         struct uds_attribute *ua = container_of(attr, struct uds_attribute, attr);
2036         struct hash_zones *zones = container_of(directory, struct hash_zones,
2037                                                 dedupe_directory);
2038
2039         if (ua->show_string != NULL)
2040                 return sprintf(buf, "%s\n", ua->show_string(zones));
2041         else
2042                 return -EINVAL;
2043 }
2044
2045 static ssize_t dedupe_status_store(struct kobject *kobj __always_unused,
2046                                    struct attribute *attr __always_unused,
2047                                    const char *buf __always_unused,
2048                                    size_t length __always_unused)
2049 {
2050         return -EINVAL;
2051 }
2052
2053 /*----------------------------------------------------------------------*/
2054
2055 static const struct sysfs_ops dedupe_sysfs_ops = {
2056         .show = dedupe_status_show,
2057         .store = dedupe_status_store,
2058 };
2059
2060 static struct uds_attribute dedupe_status_attribute = {
2061         .attr = {.name = "status", .mode = 0444, },
2062         .show_string = vdo_get_dedupe_index_state_name,
2063 };
2064
2065 static struct attribute *dedupe_attrs[] = {
2066         &dedupe_status_attribute.attr,
2067         NULL,
2068 };
2069 ATTRIBUTE_GROUPS(dedupe);
2070
2071 static const struct kobj_type dedupe_directory_type = {
2072         .release = dedupe_kobj_release,
2073         .sysfs_ops = &dedupe_sysfs_ops,
2074         .default_groups = dedupe_groups,
2075 };
2076
2077 static void start_uds_queue(void *ptr)
2078 {
2079         /*
2080          * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2081          * during the UDS calls that open or close an index, but those allocations can safely sleep
2082          * while reserving a large amount of memory. We could use an allocations_allowed boolean
2083          * (like the base threads do), but it would be an unnecessary embellishment.
2084          */
2085         struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2086
2087         uds_register_allocating_thread(&thread->allocating_thread, NULL);
2088 }
2089
2090 static void finish_uds_queue(void *ptr __always_unused)
2091 {
2092         uds_unregister_allocating_thread();
2093 }
2094
2095 static void close_index(struct hash_zones *zones)
2096 {
2097         int result;
2098
2099         /*
2100          * Change the index state so that get_index_statistics() will not try to use the index
2101          * session we are closing.
2102          */
2103         zones->index_state = IS_CHANGING;
2104         /* Close the index session, while not holding the lock. */
2105         spin_unlock(&zones->lock);
2106         result = uds_close_index(zones->index_session);
2107
2108         if (result != UDS_SUCCESS)
2109                 uds_log_error_strerror(result, "Error closing index");
2110         spin_lock(&zones->lock);
2111         zones->index_state = IS_CLOSED;
2112         zones->error_flag |= result != UDS_SUCCESS;
2113         /* ASSERTION: We leave in IS_CLOSED state. */
2114 }
2115
2116 static void open_index(struct hash_zones *zones)
2117 {
2118         /* ASSERTION: We enter in IS_CLOSED state. */
2119         int result;
2120         bool create_flag = zones->create_flag;
2121
2122         zones->create_flag = false;
2123         /*
2124          * Change the index state so that the it will be reported to the outside world as
2125          * "opening".
2126          */
2127         zones->index_state = IS_CHANGING;
2128         zones->error_flag = false;
2129
2130         /* Open the index session, while not holding the lock */
2131         spin_unlock(&zones->lock);
2132         result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2133                                 &zones->parameters, zones->index_session);
2134         if (result != UDS_SUCCESS)
2135                 uds_log_error_strerror(result, "Error opening index");
2136
2137         spin_lock(&zones->lock);
2138         if (!create_flag) {
2139                 switch (result) {
2140                 case -ENOENT:
2141                         /*
2142                          * Either there is no index, or there is no way we can recover the index.
2143                          * We will be called again and try to create a new index.
2144                          */
2145                         zones->index_state = IS_CLOSED;
2146                         zones->create_flag = true;
2147                         return;
2148                 default:
2149                         break;
2150                 }
2151         }
2152         if (result == UDS_SUCCESS) {
2153                 zones->index_state = IS_OPENED;
2154         } else {
2155                 zones->index_state = IS_CLOSED;
2156                 zones->index_target = IS_CLOSED;
2157                 zones->error_flag = true;
2158                 spin_unlock(&zones->lock);
2159                 uds_log_info("Setting UDS index target state to error");
2160                 spin_lock(&zones->lock);
2161         }
2162         /*
2163          * ASSERTION: On success, we leave in IS_OPENED state.
2164          * ASSERTION: On failure, we leave in IS_CLOSED state.
2165          */
2166 }
2167
2168 static void change_dedupe_state(struct vdo_completion *completion)
2169 {
2170         struct hash_zones *zones = as_hash_zones(completion);
2171
2172         spin_lock(&zones->lock);
2173
2174         /* Loop until the index is in the target state and the create flag is clear. */
2175         while (vdo_is_state_normal(&zones->state) &&
2176                ((zones->index_state != zones->index_target) || zones->create_flag)) {
2177                 if (zones->index_state == IS_OPENED)
2178                         close_index(zones);
2179                 else
2180                         open_index(zones);
2181         }
2182
2183         zones->changing = false;
2184         spin_unlock(&zones->lock);
2185 }
2186
2187 static void start_expiration_timer(struct dedupe_context *context)
2188 {
2189         u64 start_time = context->submission_jiffies;
2190         u64 end_time;
2191
2192         if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2193                                 DEDUPE_QUERY_TIMER_RUNNING))
2194                 return;
2195
2196         end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2197                        jiffies + vdo_dedupe_index_min_timer_jiffies);
2198         mod_timer(&context->zone->timer, end_time);
2199 }
2200
2201 /**
2202  * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2203  *                            expiration time without getting answers, so we timed them out.
2204  * @zones: the hash zones.
2205  * @timeouts: the number of newly timed out requests.
2206  */
2207 static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2208 {
2209         atomic64_add(timeouts, &zones->timeouts);
2210         spin_lock(&zones->lock);
2211         if (__ratelimit(&zones->ratelimiter)) {
2212                 u64 unreported = atomic64_read(&zones->timeouts);
2213
2214                 unreported -= zones->reported_timeouts;
2215                 uds_log_debug("UDS index timeout on %llu requests",
2216                               (unsigned long long) unreported);
2217                 zones->reported_timeouts += unreported;
2218         }
2219         spin_unlock(&zones->lock);
2220 }
2221
2222 static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2223 {
2224         int result;
2225         off_t uds_offset;
2226         struct volume_geometry geometry = vdo->geometry;
2227         static const struct vdo_work_queue_type uds_queue_type = {
2228                 .start = start_uds_queue,
2229                 .finish = finish_uds_queue,
2230                 .max_priority = UDS_Q_MAX_PRIORITY,
2231                 .default_priority = UDS_Q_PRIORITY,
2232         };
2233
2234         vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2235         vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2236
2237         /*
2238          * Since we will save up the timeouts that would have been reported but were ratelimited,
2239          * we don't need to report ratelimiting.
2240          */
2241         ratelimit_default_init(&zones->ratelimiter);
2242         ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2243         uds_offset = ((vdo_get_index_region_start(geometry) -
2244                        geometry.bio_offset) * VDO_BLOCK_SIZE);
2245         zones->parameters = (struct uds_parameters) {
2246                 .bdev = vdo->device_config->owned_device->bdev,
2247                 .offset = uds_offset,
2248                 .size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2249                 .memory_size = geometry.index_config.mem,
2250                 .sparse = geometry.index_config.sparse,
2251                 .nonce = (u64) geometry.nonce,
2252         };
2253
2254         result = uds_create_index_session(&zones->index_session);
2255         if (result != UDS_SUCCESS)
2256                 return result;
2257
2258         result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2259                                  1, NULL);
2260         if (result != VDO_SUCCESS) {
2261                 uds_destroy_index_session(uds_forget(zones->index_session));
2262                 uds_log_error("UDS index queue initialization failed (%d)", result);
2263                 return result;
2264         }
2265
2266         vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2267         vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2268                                     vdo->thread_config.dedupe_thread);
2269         kobject_init(&zones->dedupe_directory, &dedupe_directory_type);
2270         return VDO_SUCCESS;
2271 }
2272
2273 /**
2274  * finish_index_operation() - This is the UDS callback for index queries.
2275  * @request: The uds request which has just completed.
2276  */
2277 static void finish_index_operation(struct uds_request *request)
2278 {
2279         struct dedupe_context *context = container_of(request, struct dedupe_context,
2280                                                       request);
2281
2282         if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2283                                  DEDUPE_CONTEXT_COMPLETE)) {
2284                 /*
2285                  * This query has not timed out, so send its data_vio back to its hash zone to
2286                  * process the results.
2287                  */
2288                 continue_data_vio(context->requestor);
2289                 return;
2290         }
2291
2292         /*
2293          * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2294          * data_vio has already moved on.
2295          */
2296         if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2297                                   DEDUPE_CONTEXT_TIMED_OUT_COMPLETE))
2298                 ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2299                                 atomic_read(&context->state));
2300
2301         uds_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2302 }
2303
2304 /**
2305  * check_for_drain_complete() - Check whether this zone has drained.
2306  * @zone: The zone to check.
2307  */
2308 static void check_for_drain_complete(struct hash_zone *zone)
2309 {
2310         data_vio_count_t recycled = 0;
2311
2312         if (!vdo_is_state_draining(&zone->state))
2313                 return;
2314
2315         if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2316             change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2317                                DEDUPE_QUERY_TIMER_IDLE)) {
2318                 del_timer_sync(&zone->timer);
2319         } else {
2320                 /*
2321                  * There is an in flight time-out, which must get processed before we can continue.
2322                  */
2323                 return;
2324         }
2325
2326         for (;;) {
2327                 struct dedupe_context *context;
2328                 struct funnel_queue_entry *entry;
2329
2330                 entry = uds_funnel_queue_poll(zone->timed_out_complete);
2331                 if (entry == NULL)
2332                         break;
2333
2334                 context = container_of(entry, struct dedupe_context, queue_entry);
2335                 atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2336                 list_add(&context->list_entry, &zone->available);
2337                 recycled++;
2338         }
2339
2340         if (recycled > 0)
2341                 WRITE_ONCE(zone->active, zone->active - recycled);
2342         ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2343         vdo_finish_draining(&zone->state);
2344 }
2345
2346 static void timeout_index_operations_callback(struct vdo_completion *completion)
2347 {
2348         struct dedupe_context *context, *tmp;
2349         struct hash_zone *zone = as_hash_zone(completion);
2350         u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2351         unsigned long cutoff = jiffies - timeout_jiffies;
2352         unsigned int timed_out = 0;
2353
2354         atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2355         list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2356                 if (cutoff <= context->submission_jiffies) {
2357                         /*
2358                          * We have reached the oldest query which has not timed out yet, so restart
2359                          * the timer.
2360                          */
2361                         start_expiration_timer(context);
2362                         break;
2363                 }
2364
2365                 if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2366                                           DEDUPE_CONTEXT_TIMED_OUT)) {
2367                         /*
2368                          * This context completed between the time the timeout fired, and now. We
2369                          * can treat it as a a successful query, its requestor is already enqueued
2370                          * to process it.
2371                          */
2372                         continue;
2373                 }
2374
2375                 /*
2376                  * Remove this context from the pending list so we won't look at it again on a
2377                  * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2378                  * send its requestor on its way.
2379                  */
2380                 list_del_init(&context->list_entry);
2381                 continue_data_vio(context->requestor);
2382                 timed_out++;
2383         }
2384
2385         if (timed_out > 0)
2386                 report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2387
2388         check_for_drain_complete(zone);
2389 }
2390
2391 static void timeout_index_operations(struct timer_list *t)
2392 {
2393         struct hash_zone *zone = from_timer(zone, t, timer);
2394
2395         if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2396                                DEDUPE_QUERY_TIMER_FIRED))
2397                 vdo_launch_completion(&zone->completion);
2398 }
2399
2400 static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2401                                         zone_count_t zone_number)
2402 {
2403         int result;
2404         data_vio_count_t i;
2405         struct hash_zone *zone = &zones->zones[zone_number];
2406
2407         result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2408         if (result != VDO_SUCCESS)
2409                 return result;
2410
2411         vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2412         zone->zone_number = zone_number;
2413         zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2414         vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2415         vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2416                                     zone->thread_id);
2417         INIT_LIST_HEAD(&zone->lock_pool);
2418         result = uds_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2419                               &zone->lock_array);
2420         if (result != VDO_SUCCESS)
2421                 return result;
2422
2423         for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2424                 return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2425
2426         INIT_LIST_HEAD(&zone->available);
2427         INIT_LIST_HEAD(&zone->pending);
2428         result = uds_make_funnel_queue(&zone->timed_out_complete);
2429         if (result != VDO_SUCCESS)
2430                 return result;
2431
2432         timer_setup(&zone->timer, timeout_index_operations, 0);
2433
2434         for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2435                 struct dedupe_context *context = &zone->contexts[i];
2436
2437                 context->zone = zone;
2438                 context->request.callback = finish_index_operation;
2439                 context->request.session = zones->index_session;
2440                 list_add(&context->list_entry, &zone->available);
2441         }
2442
2443         return vdo_make_default_thread(vdo, zone->thread_id);
2444 }
2445
2446 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
2447 static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2448 {
2449         struct hash_zones *zones = context;
2450
2451         return zones->zones[zone_number].thread_id;
2452 }
2453
2454 /**
2455  * vdo_make_hash_zones() - Create the hash zones.
2456  *
2457  * @vdo: The vdo to which the zone will belong.
2458  * @zones_ptr: A pointer to hold the zones.
2459  *
2460  * Return: VDO_SUCCESS or an error code.
2461  */
2462 int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2463 {
2464         int result;
2465         struct hash_zones *zones;
2466         zone_count_t z;
2467         zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2468
2469         if (zone_count == 0)
2470                 return VDO_SUCCESS;
2471
2472         result = uds_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2473                                        __func__, &zones);
2474         if (result != VDO_SUCCESS)
2475                 return result;
2476
2477         result = initialize_index(vdo, zones);
2478         if (result != VDO_SUCCESS) {
2479                 uds_free(zones);
2480                 return result;
2481         }
2482
2483         vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2484
2485         zones->zone_count = zone_count;
2486         for (z = 0; z < zone_count; z++) {
2487                 result = initialize_zone(vdo, zones, z);
2488                 if (result != VDO_SUCCESS) {
2489                         vdo_free_hash_zones(zones);
2490                         return result;
2491                 }
2492         }
2493
2494         result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2495                                          vdo->thread_config.admin_thread, zones, NULL,
2496                                          vdo, &zones->manager);
2497         if (result != VDO_SUCCESS) {
2498                 vdo_free_hash_zones(zones);
2499                 return result;
2500         }
2501
2502         *zones_ptr = zones;
2503         return VDO_SUCCESS;
2504 }
2505
2506 void vdo_finish_dedupe_index(struct hash_zones *zones)
2507 {
2508         if (zones == NULL)
2509                 return;
2510
2511         uds_destroy_index_session(uds_forget(zones->index_session));
2512 }
2513
2514 /**
2515  * vdo_free_hash_zones() - Free the hash zones.
2516  * @zones: The zone to free.
2517  */
2518 void vdo_free_hash_zones(struct hash_zones *zones)
2519 {
2520         zone_count_t i;
2521
2522         if (zones == NULL)
2523                 return;
2524
2525         uds_free(uds_forget(zones->manager));
2526
2527         for (i = 0; i < zones->zone_count; i++) {
2528                 struct hash_zone *zone = &zones->zones[i];
2529
2530                 uds_free_funnel_queue(uds_forget(zone->timed_out_complete));
2531                 vdo_int_map_free(uds_forget(zone->hash_lock_map));
2532                 uds_free(uds_forget(zone->lock_array));
2533         }
2534
2535         if (zones->index_session != NULL)
2536                 vdo_finish_dedupe_index(zones);
2537
2538         ratelimit_state_exit(&zones->ratelimiter);
2539         if (vdo_get_admin_state_code(&zones->state) == VDO_ADMIN_STATE_NEW)
2540                 uds_free(zones);
2541         else
2542                 kobject_put(&zones->dedupe_directory);
2543 }
2544
2545 static void initiate_suspend_index(struct admin_state *state)
2546 {
2547         struct hash_zones *zones = container_of(state, struct hash_zones, state);
2548         enum index_state index_state;
2549
2550         spin_lock(&zones->lock);
2551         index_state = zones->index_state;
2552         spin_unlock(&zones->lock);
2553
2554         if (index_state != IS_CLOSED) {
2555                 bool save = vdo_is_state_saving(&zones->state);
2556                 int result;
2557
2558                 result = uds_suspend_index_session(zones->index_session, save);
2559                 if (result != UDS_SUCCESS)
2560                         uds_log_error_strerror(result, "Error suspending dedupe index");
2561         }
2562
2563         vdo_finish_draining(state);
2564 }
2565
2566 /**
2567  * suspend_index() - Suspend the UDS index prior to draining hash zones.
2568  *
2569  * Implements vdo_action_preamble_fn
2570  */
2571 static void suspend_index(void *context, struct vdo_completion *completion)
2572 {
2573         struct hash_zones *zones = context;
2574
2575         vdo_start_draining(&zones->state,
2576                            vdo_get_current_manager_operation(zones->manager), completion,
2577                            initiate_suspend_index);
2578 }
2579
2580 /**
2581  * initiate_drain() - Initiate a drain.
2582  *
2583  * Implements vdo_admin_initiator_fn.
2584  */
2585 static void initiate_drain(struct admin_state *state)
2586 {
2587         check_for_drain_complete(container_of(state, struct hash_zone, state));
2588 }
2589
2590 /**
2591  * drain_hash_zone() - Drain a hash zone.
2592  *
2593  * Implements vdo_zone_action_fn.
2594  */
2595 static void drain_hash_zone(void *context, zone_count_t zone_number,
2596                             struct vdo_completion *parent)
2597 {
2598         struct hash_zones *zones = context;
2599
2600         vdo_start_draining(&zones->zones[zone_number].state,
2601                            vdo_get_current_manager_operation(zones->manager), parent,
2602                            initiate_drain);
2603 }
2604
2605 /** vdo_drain_hash_zones() - Drain all hash zones. */
2606 void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2607 {
2608         vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2609                                drain_hash_zone, NULL, parent);
2610 }
2611
2612 static void launch_dedupe_state_change(struct hash_zones *zones)
2613 {
2614         /* ASSERTION: We enter with the lock held. */
2615         if (zones->changing || !vdo_is_state_normal(&zones->state))
2616                 /* Either a change is already in progress, or changes are not allowed. */
2617                 return;
2618
2619         if (zones->create_flag || (zones->index_state != zones->index_target)) {
2620                 zones->changing = true;
2621                 vdo_launch_completion(&zones->completion);
2622                 return;
2623         }
2624
2625         /* ASSERTION: We exit with the lock held. */
2626 }
2627
2628 /**
2629  * resume_index() - Resume the UDS index prior to resuming hash zones.
2630  *
2631  * Implements vdo_action_preamble_fn
2632  */
2633 static void resume_index(void *context, struct vdo_completion *parent)
2634 {
2635         struct hash_zones *zones = context;
2636         struct device_config *config = parent->vdo->device_config;
2637         int result;
2638
2639         zones->parameters.bdev = config->owned_device->bdev;
2640         result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2641         if (result != UDS_SUCCESS)
2642                 uds_log_error_strerror(result, "Error resuming dedupe index");
2643
2644         spin_lock(&zones->lock);
2645         vdo_resume_if_quiescent(&zones->state);
2646
2647         if (config->deduplication) {
2648                 zones->index_target = IS_OPENED;
2649                 WRITE_ONCE(zones->dedupe_flag, true);
2650         } else {
2651                 zones->index_target = IS_CLOSED;
2652         }
2653
2654         launch_dedupe_state_change(zones);
2655         spin_unlock(&zones->lock);
2656
2657         vdo_finish_completion(parent);
2658 }
2659
2660 /**
2661  * resume_hash_zone() - Resume a hash zone.
2662  *
2663  * Implements vdo_zone_action_fn.
2664  */
2665 static void resume_hash_zone(void *context, zone_count_t zone_number,
2666                              struct vdo_completion *parent)
2667 {
2668         struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2669
2670         vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2671 }
2672
2673 /**
2674  * vdo_resume_hash_zones() - Resume a set of hash zones.
2675  * @zones: The hash zones to resume.
2676  * @parent: The object to notify when the zones have resumed.
2677  */
2678 void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2679 {
2680         if (vdo_is_read_only(parent->vdo)) {
2681                 vdo_launch_completion(parent);
2682                 return;
2683         }
2684
2685         vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2686                                resume_hash_zone, NULL, parent);
2687 }
2688
2689 /**
2690  * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2691  * @zone: The hash zone to query.
2692  * @tally: The tally
2693  */
2694 static void get_hash_zone_statistics(const struct hash_zone *zone,
2695                                      struct hash_lock_statistics *tally)
2696 {
2697         const struct hash_lock_statistics *stats = &zone->statistics;
2698
2699         tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2700         tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2701         tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2702         tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2703         tally->curr_dedupe_queries += READ_ONCE(zone->active);
2704 }
2705
2706 static void get_index_statistics(struct hash_zones *zones,
2707                                  struct index_statistics *stats)
2708 {
2709         enum index_state state;
2710         struct uds_index_stats index_stats;
2711         int result;
2712
2713         spin_lock(&zones->lock);
2714         state = zones->index_state;
2715         spin_unlock(&zones->lock);
2716
2717         if (state != IS_OPENED)
2718                 return;
2719
2720         result = uds_get_index_session_stats(zones->index_session, &index_stats);
2721         if (result != UDS_SUCCESS) {
2722                 uds_log_error_strerror(result, "Error reading index stats");
2723                 return;
2724         }
2725
2726         stats->entries_indexed = index_stats.entries_indexed;
2727         stats->posts_found = index_stats.posts_found;
2728         stats->posts_not_found = index_stats.posts_not_found;
2729         stats->queries_found = index_stats.queries_found;
2730         stats->queries_not_found = index_stats.queries_not_found;
2731         stats->updates_found = index_stats.updates_found;
2732         stats->updates_not_found = index_stats.updates_not_found;
2733         stats->entries_discarded = index_stats.entries_discarded;
2734 }
2735
2736 /**
2737  * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2738  * @hash_zones: The hash zones to query
2739  *
2740  * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2741  *         index
2742  */
2743 void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2744
2745 {
2746         zone_count_t zone;
2747
2748         for (zone = 0; zone < zones->zone_count; zone++)
2749                 get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2750
2751         get_index_statistics(zones, &stats->index);
2752
2753         /*
2754          * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2755          * of queries not made because of earlier timeouts.
2756          */
2757         stats->dedupe_advice_timeouts =
2758                 (atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2759 }
2760
2761 /**
2762  * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2763  * @zones: The hash_zones from which to select.
2764  * @name: The record name.
2765  *
2766  * Return: The hash zone responsible for the record name.
2767  */
2768 struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2769                                        const struct uds_record_name *name)
2770 {
2771         /*
2772          * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2773          * since the number of hash zones is small.
2774          * TODO: Verify that the first byte is independent enough.
2775          */
2776         u32 hash = name->name[0];
2777
2778         /*
2779          * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2780          * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2781          * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2782          * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2783          */
2784         hash = (hash * zones->zone_count) >> 8;
2785         return &zones->zones[hash];
2786 }
2787
2788 /**
2789  * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2790  *                    free list.
2791  * @lock: The hash lock to dump.
2792  */
2793 static void dump_hash_lock(const struct hash_lock *lock)
2794 {
2795         const char *state;
2796
2797         if (!list_empty(&lock->pool_node)) {
2798                 /* This lock is on the free list. */
2799                 return;
2800         }
2801
2802         /*
2803          * Necessarily cryptic since we can log a lot of these. First three chars of state is
2804          * unambiguous. 'U' indicates a lock not registered in the map.
2805          */
2806         state = get_hash_lock_state_name(lock->state);
2807         uds_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2808                      lock, state, (lock->registered ? 'D' : 'U'),
2809                      (unsigned long long) lock->duplicate.pbn,
2810                      lock->duplicate.state, lock->reference_count,
2811                      vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2812 }
2813
2814 static const char *index_state_to_string(struct hash_zones *zones,
2815                                          enum index_state state)
2816 {
2817         if (!vdo_is_state_normal(&zones->state))
2818                 return SUSPENDED;
2819
2820         switch (state) {
2821         case IS_CLOSED:
2822                 return zones->error_flag ? ERROR : CLOSED;
2823         case IS_CHANGING:
2824                 return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2825         case IS_OPENED:
2826                 return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2827         default:
2828                 return UNKNOWN;
2829         }
2830 }
2831
2832 /**
2833  * vdo_dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2834  * @zone: The zone to dump.
2835  */
2836 static void dump_hash_zone(const struct hash_zone *zone)
2837 {
2838         data_vio_count_t i;
2839
2840         if (zone->hash_lock_map == NULL) {
2841                 uds_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2842                 return;
2843         }
2844
2845         uds_log_info("struct hash_zone %u: mapSize=%zu",
2846                      zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2847         for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2848                 dump_hash_lock(&zone->lock_array[i]);
2849 }
2850
2851 /**
2852  * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2853  * @zones: The zones to dump.
2854  */
2855 void vdo_dump_hash_zones(struct hash_zones *zones)
2856 {
2857         const char *state, *target;
2858         zone_count_t zone;
2859
2860         spin_lock(&zones->lock);
2861         state = index_state_to_string(zones, zones->index_state);
2862         target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2863         spin_unlock(&zones->lock);
2864
2865         uds_log_info("UDS index: state: %s", state);
2866         if (target != NULL)
2867                 uds_log_info("UDS index: changing to state: %s", target);
2868
2869         for (zone = 0; zone < zones->zone_count; zone++)
2870                 dump_hash_zone(&zones->zones[zone]);
2871 }
2872
2873 void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2874 {
2875         u64 alb_jiffies;
2876
2877         /* Arbitrary maximum value is two minutes */
2878         if (value > 120000)
2879                 value = 120000;
2880         /* Arbitrary minimum value is 2 jiffies */
2881         alb_jiffies = msecs_to_jiffies(value);
2882
2883         if (alb_jiffies < 2) {
2884                 alb_jiffies = 2;
2885                 value = jiffies_to_msecs(alb_jiffies);
2886         }
2887         vdo_dedupe_index_timeout_interval = value;
2888         vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2889 }
2890
2891 void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2892 {
2893         u64 min_jiffies;
2894
2895         /* Arbitrary maximum value is one second */
2896         if (value > 1000)
2897                 value = 1000;
2898
2899         /* Arbitrary minimum value is 2 jiffies */
2900         min_jiffies = msecs_to_jiffies(value);
2901
2902         if (min_jiffies < 2) {
2903                 min_jiffies = 2;
2904                 value = jiffies_to_msecs(min_jiffies);
2905         }
2906
2907         vdo_dedupe_index_min_timer_interval = value;
2908         vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2909 }
2910
2911 /**
2912  * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2913  * @zone: the hash zone
2914  *
2915  * Return: A dedupe_context or NULL if none are available
2916  */
2917 static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2918 {
2919         struct dedupe_context *context;
2920         struct funnel_queue_entry *entry;
2921
2922         assert_in_hash_zone(zone, __func__);
2923
2924         if (!list_empty(&zone->available)) {
2925                 WRITE_ONCE(zone->active, zone->active + 1);
2926                 context = list_first_entry(&zone->available, struct dedupe_context,
2927                                            list_entry);
2928                 list_del_init(&context->list_entry);
2929                 return context;
2930         }
2931
2932         entry = uds_funnel_queue_poll(zone->timed_out_complete);
2933         return ((entry == NULL) ?
2934                 NULL : container_of(entry, struct dedupe_context, queue_entry));
2935 }
2936
2937 static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2938                                 enum uds_request_type operation)
2939 {
2940         request->record_name = data_vio->record_name;
2941         request->type = operation;
2942         if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2943                 size_t offset = 0;
2944                 struct uds_record_data *encoding = &request->new_metadata;
2945
2946                 encoding->data[offset++] = UDS_ADVICE_VERSION;
2947                 encoding->data[offset++] = data_vio->new_mapped.state;
2948                 put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2949                 offset += sizeof(u64);
2950                 BUG_ON(offset != UDS_ADVICE_SIZE);
2951         }
2952 }
2953
2954 /*
2955  * The index operation will inquire about data_vio.record_name, providing (if the operation is
2956  * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2957  * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2958  * set to the return status code of any asynchronous index processing.
2959  */
2960 static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2961 {
2962         int result;
2963         struct dedupe_context *context;
2964         struct vdo *vdo = vdo_from_data_vio(data_vio);
2965         struct hash_zone *zone = data_vio->hash_zone;
2966
2967         assert_data_vio_in_hash_zone(data_vio);
2968
2969         if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2970                 continue_data_vio(data_vio);
2971                 return;
2972         }
2973
2974         context = acquire_context(zone);
2975         if (context == NULL) {
2976                 atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2977                 continue_data_vio(data_vio);
2978                 return;
2979         }
2980
2981         data_vio->dedupe_context = context;
2982         context->requestor = data_vio;
2983         context->submission_jiffies = jiffies;
2984         prepare_uds_request(&context->request, data_vio, operation);
2985         atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2986         list_add_tail(&context->list_entry, &zone->pending);
2987         start_expiration_timer(context);
2988         result = uds_launch_request(&context->request);
2989         if (result != UDS_SUCCESS) {
2990                 context->request.status = result;
2991                 finish_index_operation(&context->request);
2992         }
2993 }
2994
2995 static void set_target_state(struct hash_zones *zones, enum index_state target,
2996                              bool change_dedupe, bool dedupe, bool set_create)
2997 {
2998         const char *old_state, *new_state;
2999
3000         spin_lock(&zones->lock);
3001         old_state = index_state_to_string(zones, zones->index_target);
3002         if (change_dedupe)
3003                 WRITE_ONCE(zones->dedupe_flag, dedupe);
3004
3005         if (set_create)
3006                 zones->create_flag = true;
3007
3008         zones->index_target = target;
3009         launch_dedupe_state_change(zones);
3010         new_state = index_state_to_string(zones, zones->index_target);
3011         spin_unlock(&zones->lock);
3012
3013         if (old_state != new_state)
3014                 uds_log_info("Setting UDS index target state to %s", new_state);
3015 }
3016
3017 const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
3018 {
3019         const char *state;
3020
3021         spin_lock(&zones->lock);
3022         state = index_state_to_string(zones, zones->index_state);
3023         spin_unlock(&zones->lock);
3024
3025         return state;
3026 }
3027
3028 /* Handle a dmsetup message relevant to the index. */
3029 int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
3030 {
3031         if (strcasecmp(name, "index-close") == 0) {
3032                 set_target_state(zones, IS_CLOSED, false, false, false);
3033                 return 0;
3034         } else if (strcasecmp(name, "index-create") == 0) {
3035                 set_target_state(zones, IS_OPENED, false, false, true);
3036                 return 0;
3037         } else if (strcasecmp(name, "index-disable") == 0) {
3038                 set_target_state(zones, IS_OPENED, true, false, false);
3039                 return 0;
3040         } else if (strcasecmp(name, "index-enable") == 0) {
3041                 set_target_state(zones, IS_OPENED, true, true, false);
3042                 return 0;
3043         }
3044
3045         return -EINVAL;
3046 }
3047
3048 int vdo_add_dedupe_index_sysfs(struct hash_zones *zones)
3049 {
3050         int result = kobject_add(&zones->dedupe_directory,
3051                                  &zones->completion.vdo->vdo_directory, "dedupe");
3052
3053         if (result == 0)
3054                 vdo_set_admin_state_code(&zones->state,
3055                                          VDO_ADMIN_STATE_NORMAL_OPERATION);
3056
3057         return result;
3058 }
3059
3060 /* If create_flag, create a new index without first attempting to load an existing index. */
3061 void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
3062 {
3063         set_target_state(zones, IS_OPENED, true, true, create_flag);
3064 }