1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
11 * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12 * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13 * also against each other. This saves on index queries and allows those data_vios to concurrently
14 * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15 * index query is needed for each hash_lock, instead of one for every data_vio.
17 * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
18 * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
19 * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
20 * containing the lock. An asynchronous operation is almost always performed upon entering a state,
21 * and the callback from that operation triggers exiting the state and entering a new state.
23 * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
24 * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
25 * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
26 * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
27 * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
28 * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
29 * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
30 * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
32 * The existence of lock waiters is a key factor controlling which state the lock transitions to
33 * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
34 * doesn't, it will try to clean up and exit.
36 * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
37 * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
38 * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
39 * new copy of the data to a full data block or a slot in a compressed block (WRITING).
41 * Cleaning up consists of updating the index when the data location is different from the initial
42 * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
43 * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
44 * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
46 * The shortest sequence of states is for non-concurrent writes of new data:
47 * INITIALIZING -> QUERYING -> WRITING -> BYPASSING
48 * This sequence is short because no PBN read lock or index update is needed.
50 * Non-concurrent, finding valid advice looks like this (endpoints elided):
51 * -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
52 * Or with stale advice (endpoints elided):
53 * -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
55 * When there are not enough available reference count increments available on a PBN for a data_vio
56 * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
57 * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
58 * data_vios will be directed to it. The two locks will proceed independently, but only the new
59 * lock will have the right to update the index (unless it also forks).
61 * Since rollover happens in a lock instance, once a valid data location has been selected, it will
62 * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
63 * non-endpoint states can be re-entered.
65 * The function names in this module follow a convention referencing the states and transitions in
66 * the state machine. For example, for the LOCKING state, there are start_locking() and
67 * finish_locking() functions. start_locking() is invoked by the finish function of the state (or
68 * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
69 * on the hash zone thread. finish_locking() is called by (or continued via callback from) the
70 * code actually obtaining the lock. It does any bookkeeping or decision-making required and
71 * invokes the appropriate start function of the state being transitioned to after LOCKING.
73 * ----------------------------------------------------------------------
77 * A query to the UDS index is handled asynchronously by the index's threads. When the query is
78 * complete, a callback supplied with the query will be called from one of the those threads. Under
79 * heavy system load, the index may be slower to respond then is desirable for reasonable I/O
80 * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
81 * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
82 * request without deduplicating. However, because the uds_request struct itself is supplied by the
83 * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
84 * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
85 * reference to the data_vio on behalf of which they are performing a query.
87 * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
88 * its hash_zone's pool. If one is available, that context is prepared, associated with the
89 * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
90 * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
91 * goes well, the dedupe callback will be called by the index which will change the context's state
92 * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
93 * zone where the query results will be processed and the context will be put back in the idle
94 * state and returned to the hash_zone's available list.
96 * The first time an index query is launched from a given hash_zone, a timer is started. When the
97 * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
98 * pending list will be searched for any contexts in the pending state which have been running for
99 * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
100 * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
101 * data_vios associated with timed out contexts are sent to continue processing their write
102 * operation without deduplicating. The timer is also restarted.
104 * When the dedupe callback is run for a context which is in the timed out state, that context is
105 * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
106 * associated data_vios have already been dispatched.
108 * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
109 * be searched for any contexts which are timed out and complete. One of these will be used
110 * immediately, and the rest will be returned to the available list and marked idle.
115 #include <linux/atomic.h>
116 #include <linux/jiffies.h>
117 #include <linux/kernel.h>
118 #include <linux/kobject.h>
119 #include <linux/list.h>
120 #include <linux/ratelimit.h>
121 #include <linux/spinlock.h>
122 #include <linux/timer.h>
125 #include "memory-alloc.h"
127 #include "permassert.h"
128 #include "string-utils.h"
131 #include "action-manager.h"
132 #include "admin-state.h"
133 #include "completion.h"
134 #include "constants.h"
135 #include "data-vio.h"
137 #include "io-submitter.h"
139 #include "physical-zone.h"
140 #include "slab-depot.h"
141 #include "statistics.h"
144 #include "wait-queue.h"
146 struct uds_attribute {
147 struct attribute attr;
148 const char *(*show_string)(struct hash_zones *hash_zones);
152 DEDUPE_QUERY_TIMER_IDLE,
153 DEDUPE_QUERY_TIMER_RUNNING,
154 DEDUPE_QUERY_TIMER_FIRED,
157 enum dedupe_context_state {
159 DEDUPE_CONTEXT_PENDING,
160 DEDUPE_CONTEXT_TIMED_OUT,
161 DEDUPE_CONTEXT_COMPLETE,
162 DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
165 /* Possible index states: closed, opened, or transitioning between those two. */
172 static const char *CLOSED = "closed";
173 static const char *CLOSING = "closing";
174 static const char *ERROR = "error";
175 static const char *OFFLINE = "offline";
176 static const char *ONLINE = "online";
177 static const char *OPENING = "opening";
178 static const char *SUSPENDED = "suspended";
179 static const char *UNKNOWN = "unknown";
181 /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
183 UDS_ADVICE_VERSION = 2,
184 /* version byte + state byte + 64-bit little-endian PBN */
185 UDS_ADVICE_SIZE = 1 + 1 + sizeof(u64),
188 enum hash_lock_state {
189 /* State for locks that are not in use or are being initialized. */
190 VDO_HASH_LOCK_INITIALIZING,
192 /* This is the sequence of states typically used on the non-dedupe path. */
193 VDO_HASH_LOCK_QUERYING,
194 VDO_HASH_LOCK_WRITING,
195 VDO_HASH_LOCK_UPDATING,
197 /* The remaining states are typically used on the dedupe path in this order. */
198 VDO_HASH_LOCK_LOCKING,
199 VDO_HASH_LOCK_VERIFYING,
200 VDO_HASH_LOCK_DEDUPING,
201 VDO_HASH_LOCK_UNLOCKING,
204 * Terminal state for locks returning to the pool. Must be last both because it's the final
205 * state, and also because it's used to count the states.
207 VDO_HASH_LOCK_BYPASSING,
210 static const char * const LOCK_STATE_NAMES[] = {
211 [VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
212 [VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
213 [VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
214 [VDO_HASH_LOCK_LOCKING] = "LOCKING",
215 [VDO_HASH_LOCK_QUERYING] = "QUERYING",
216 [VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
217 [VDO_HASH_LOCK_UPDATING] = "UPDATING",
218 [VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
219 [VDO_HASH_LOCK_WRITING] = "WRITING",
223 /* The block hash covered by this lock */
224 struct uds_record_name hash;
226 /* When the lock is unused, this list entry allows the lock to be pooled */
227 struct list_head pool_node;
230 * A list containing the data VIOs sharing this lock, all having the same record name and
231 * data block contents, linked by their hash_lock_node fields.
233 struct list_head duplicate_ring;
235 /* The number of data_vios sharing this lock instance */
236 data_vio_count_t reference_count;
238 /* The maximum value of reference_count in the lifetime of this lock */
239 data_vio_count_t max_references;
241 /* The current state of this lock */
242 enum hash_lock_state state;
244 /* True if the UDS index should be updated with new advice */
247 /* True if the advice has been verified to be a true duplicate */
250 /* True if the lock has already accounted for an initial verification */
253 /* True if this lock is registered in the lock map (cleared on rollover) */
257 * If verified is false, this is the location of a possible duplicate. If verified is true,
258 * it is the verified location of a true duplicate.
260 struct zoned_pbn duplicate;
262 /* The PBN lock on the block containing the duplicate data */
263 struct pbn_lock *duplicate_lock;
265 /* The data_vio designated to act on behalf of the lock */
266 struct data_vio *agent;
269 * Other data_vios with data identical to the agent who are currently waiting for the agent
270 * to get the information they all need to deduplicate--either against each other, or
271 * against an existing duplicate on disk.
273 struct vdo_wait_queue waiters;
277 LOCK_POOL_CAPACITY = MAXIMUM_VDO_USER_VIOS,
281 struct action_manager *manager;
282 struct kobject dedupe_directory;
283 struct uds_parameters parameters;
284 struct uds_index_session *index_session;
285 struct ratelimit_state ratelimiter;
287 atomic64_t dedupe_context_busy;
289 /* This spinlock protects the state fields and the starting of dedupe requests. */
292 /* The fields in the next block are all protected by the lock */
293 struct vdo_completion completion;
294 enum index_state index_state;
295 enum index_state index_target;
296 struct admin_state state;
301 u64 reported_timeouts;
303 /* The number of zones */
304 zone_count_t zone_count;
305 /* The hash zones themselves */
306 struct hash_zone zones[];
309 /* These are in milliseconds. */
310 unsigned int vdo_dedupe_index_timeout_interval = 5000;
311 unsigned int vdo_dedupe_index_min_timer_interval = 100;
312 /* Same two variables, in jiffies for easier consumption. */
313 static u64 vdo_dedupe_index_timeout_jiffies;
314 static u64 vdo_dedupe_index_min_timer_jiffies;
316 static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
318 vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
319 return container_of(completion, struct hash_zone, completion);
322 static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
324 vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
325 return container_of(completion, struct hash_zones, completion);
328 static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
330 ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
331 "%s called on hash zone thread", name);
334 static inline bool change_context_state(struct dedupe_context *context, int old, int new)
336 return (atomic_cmpxchg(&context->state, old, new) == old);
339 static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
341 return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
345 * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
346 * @zone: The zone from which the lock was borrowed.
347 * @lock: The lock that is no longer in use.
349 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
351 memset(lock, 0, sizeof(*lock));
352 INIT_LIST_HEAD(&lock->pool_node);
353 INIT_LIST_HEAD(&lock->duplicate_ring);
354 vdo_waitq_init(&lock->waiters);
355 list_add_tail(&lock->pool_node, &zone->lock_pool);
359 * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
360 * the hash_lock the data_vio holds (if there is one).
361 * @data_vio: The data_vio to query.
363 * Return: The PBN lock on the data_vio's duplicate location.
365 struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
367 if (data_vio->hash_lock == NULL)
370 return data_vio->hash_lock->duplicate_lock;
374 * hash_lock_key() - Return hash_lock's record name as a hash code.
375 * @lock: The hash lock.
377 * Return: The key to use for the int map.
379 static inline u64 hash_lock_key(struct hash_lock *lock)
381 return get_unaligned_le64(&lock->hash.name);
385 * get_hash_lock_state_name() - Get the string representation of a hash lock state.
386 * @state: The hash lock state.
388 * Return: The short string representing the state
390 static const char *get_hash_lock_state_name(enum hash_lock_state state)
392 /* Catch if a state has been added without updating the name array. */
393 BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
394 return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
398 * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
399 * is being called in the hash zone.
400 * @data_vio: The data_vio expected to be the lock agent.
401 * @where: A string describing the function making the assertion.
403 static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
405 /* Not safe to access the agent field except from the hash zone. */
406 assert_data_vio_in_hash_zone(data_vio);
407 ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
408 "%s must be for the hash lock agent", where);
412 * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
413 * physical zone of the PBN lock.
414 * @hash_lock: The hash lock to update.
415 * @pbn_lock: The PBN read lock to use as the duplicate lock.
417 static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
419 ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
420 "hash lock must not already hold a duplicate lock");
422 pbn_lock->holder_count += 1;
423 hash_lock->duplicate_lock = pbn_lock;
427 * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
428 * @lock: The lock containing the wait queue.
430 * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
432 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
434 return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
438 * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
439 * @data_vio: The data_vio to update.
440 * @new_lock: The hash lock the data_vio is joining.
442 * Updates the hash lock (or locks) to reflect the change in membership.
444 static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
446 struct hash_lock *old_lock = data_vio->hash_lock;
448 if (old_lock != NULL) {
449 ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
450 "must have a hash zone when holding a hash lock");
451 ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
452 "must be on a hash lock ring when holding a hash lock");
453 ASSERT_LOG_ONLY(old_lock->reference_count > 0,
454 "hash lock reference must be counted");
456 if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
457 (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
459 * If the reference count goes to zero in a non-terminal state, we're most
460 * likely leaking this lock.
462 ASSERT_LOG_ONLY(old_lock->reference_count > 1,
463 "hash locks should only become unreferenced in a terminal state, not state %s",
464 get_hash_lock_state_name(old_lock->state));
467 list_del_init(&data_vio->hash_lock_entry);
468 old_lock->reference_count -= 1;
470 data_vio->hash_lock = NULL;
473 if (new_lock != NULL) {
475 * Keep all data_vios sharing the lock on a ring since they can complete in any
476 * order and we'll always need a pointer to one to compare data.
478 list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
479 new_lock->reference_count += 1;
480 if (new_lock->max_references < new_lock->reference_count)
481 new_lock->max_references = new_lock->reference_count;
483 data_vio->hash_lock = new_lock;
487 /* There are loops in the state diagram, so some forward decl's are needed. */
488 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
490 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
491 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
492 static void unlock_duplicate_pbn(struct vdo_completion *completion);
493 static void transfer_allocation_lock(struct data_vio *data_vio);
496 * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
497 * longer needed to be an agent for the hash lock.
498 * @data_vio: The data_vio to complete and send to be cleaned up.
500 static void exit_hash_lock(struct data_vio *data_vio)
502 /* Release the hash lock now, saving a thread transition in cleanup. */
503 vdo_release_hash_lock(data_vio);
505 /* Complete the data_vio and start the clean-up path to release any locks it still holds. */
506 data_vio->vio.completion.callback = complete_data_vio;
508 continue_data_vio(data_vio);
512 * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
513 * is_duplicate and duplicate fields from a zoned_pbn.
514 * @data_vio: The data_vio to modify.
515 * @source: The location of the duplicate.
517 static void set_duplicate_location(struct data_vio *data_vio,
518 const struct zoned_pbn source)
520 data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
521 data_vio->duplicate = source;
525 * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
526 * make the retired agent exit the hash lock.
527 * @lock: The hash lock to update.
529 * Return: The new lock agent (which will be NULL if there was no waiter)
531 static struct data_vio *retire_lock_agent(struct hash_lock *lock)
533 struct data_vio *old_agent = lock->agent;
534 struct data_vio *new_agent = dequeue_lock_waiter(lock);
536 lock->agent = new_agent;
537 exit_hash_lock(old_agent);
538 if (new_agent != NULL)
539 set_duplicate_location(new_agent, lock->duplicate);
544 * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
545 * @lock: The hash lock on which to wait.
546 * @data_vio: The data_vio to add to the queue.
548 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
550 vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
553 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
554 * one other data_vio waiting on it.
556 if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
560 * Even though we're waiting, we also have to send ourselves as a one-way message to the
561 * packer to ensure the agent continues executing. This is safe because
562 * cancel_vio_compression() guarantees the agent won't continue executing until this
563 * message arrives in the packer, and because the wait queue link isn't used for sending
566 data_vio->compression.lock_holder = lock->agent;
567 launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
571 * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
573 * @waiter: The data_vio's waiter link.
574 * @context: Not used.
576 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
578 write_data_vio(vdo_waiter_as_data_vio(waiter));
582 * start_bypassing() - Stop using the hash lock.
583 * @lock: The hash lock.
584 * @agent: The data_vio acting as the agent for the lock.
586 * Stops using the hash lock. This is the final transition for hash locks which did not get an
589 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
591 lock->state = VDO_HASH_LOCK_BYPASSING;
592 exit_hash_lock(agent);
595 void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
597 struct hash_lock *lock = data_vio->hash_lock;
599 if (lock->state == VDO_HASH_LOCK_BYPASSING) {
600 exit_hash_lock(data_vio);
604 if (lock->agent == NULL) {
605 lock->agent = data_vio;
606 } else if (data_vio != lock->agent) {
607 exit_hash_lock(data_vio);
611 lock->state = VDO_HASH_LOCK_BYPASSING;
613 /* Ensure we don't attempt to update advice when cleaning up. */
614 lock->update_advice = false;
616 vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
618 if (lock->duplicate_lock != NULL) {
619 /* The agent must reference the duplicate zone to launch it. */
620 data_vio->duplicate = lock->duplicate;
621 launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
626 data_vio->is_duplicate = false;
627 exit_hash_lock(data_vio);
631 * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
632 * duplicate candidate.
633 * @completion: The completion of the data_vio acting as the lock's agent.
635 * This continuation is registered in unlock_duplicate_pbn().
637 static void finish_unlocking(struct vdo_completion *completion)
639 struct data_vio *agent = as_data_vio(completion);
640 struct hash_lock *lock = agent->hash_lock;
642 assert_hash_lock_agent(agent, __func__);
644 ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
645 "must have released the duplicate lock for the hash lock");
647 if (!lock->verified) {
649 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
650 * block, so it must have been a lock on advice we were verifying, not on a
651 * location that was used for deduplication. Go write (or compress) the block to
652 * get a location to dedupe against.
654 start_writing(lock, agent);
659 * With the lock released, the verified duplicate block may already have changed and will
660 * need to be re-verified if a waiter arrived.
662 lock->verified = false;
664 if (vdo_waitq_has_waiters(&lock->waiters)) {
666 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
667 * agent was releasing the PBN lock. The current agent exits and the waiter has to
668 * re-lock and re-verify the duplicate location.
670 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
673 agent = retire_lock_agent(lock);
674 start_locking(lock, agent);
679 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
680 * data_vios reference it, so remove it from the lock map and return it to the pool.
682 start_bypassing(lock, agent);
686 * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
687 * contained duplicate data.
688 * @completion: The completion of the data_vio acting as the lock's agent.
690 * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
693 static void unlock_duplicate_pbn(struct vdo_completion *completion)
695 struct data_vio *agent = as_data_vio(completion);
696 struct hash_lock *lock = agent->hash_lock;
698 assert_data_vio_in_duplicate_zone(agent);
699 ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
700 "must have a duplicate lock to release");
702 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
703 uds_forget(lock->duplicate_lock));
704 if (lock->state == VDO_HASH_LOCK_BYPASSING) {
705 complete_data_vio(completion);
709 launch_data_vio_hash_zone_callback(agent, finish_unlocking);
713 * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
714 * contained duplicate data.
715 * @lock: The hash lock.
716 * @agent: The data_vio currently acting as the agent for the lock.
718 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
720 lock->state = VDO_HASH_LOCK_UNLOCKING;
721 launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
724 static void release_context(struct dedupe_context *context)
726 struct hash_zone *zone = context->zone;
728 WRITE_ONCE(zone->active, zone->active - 1);
729 list_move(&context->list_entry, &zone->available);
732 static void process_update_result(struct data_vio *agent)
734 struct dedupe_context *context = agent->dedupe_context;
736 if ((context == NULL) ||
737 !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
740 release_context(context);
744 * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
745 * @completion: The completion of the data_vio that performed the update
747 * This continuation is registered in start_querying().
749 static void finish_updating(struct vdo_completion *completion)
751 struct data_vio *agent = as_data_vio(completion);
752 struct hash_lock *lock = agent->hash_lock;
754 assert_hash_lock_agent(agent, __func__);
756 process_update_result(agent);
759 * UDS was updated successfully, so don't update again unless the duplicate location
760 * changes due to rollover.
762 lock->update_advice = false;
764 if (vdo_waitq_has_waiters(&lock->waiters)) {
766 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
767 * Send it on the verified dedupe path. The agent is done with the lock, but the
768 * lock may still need to use it to clean up after rollover.
770 start_deduping(lock, agent, true);
774 if (lock->duplicate_lock != NULL) {
776 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
777 * duplicate PBN lock, so go release it.
779 start_unlocking(lock, agent);
784 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
787 start_bypassing(lock, agent);
790 static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
793 * start_updating() - Continue deduplication with the last step, updating UDS with the location of
794 * the duplicate that should be returned as advice in the future.
795 * @lock: The hash lock.
796 * @agent: The data_vio currently acting as the agent for the lock.
798 static void start_updating(struct hash_lock *lock, struct data_vio *agent)
800 lock->state = VDO_HASH_LOCK_UPDATING;
802 ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
803 ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
805 agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
806 set_data_vio_hash_zone_callback(agent, finish_updating);
807 query_index(agent, UDS_UPDATE);
811 * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
813 * @lock: The hash lock.
814 * @data_vio: The lock holder that has finished deduplicating.
816 * If there are other data_vios still sharing the lock, this will just release the data_vio's share
817 * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
818 * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
819 * eventually be released.
821 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
823 struct data_vio *agent = data_vio;
825 ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
826 ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
827 "shouldn't have any lock waiters in DEDUPING");
829 /* Just release the lock reference if other data_vios are still deduping. */
830 if (lock->reference_count > 1) {
831 exit_hash_lock(data_vio);
835 /* The hash lock must have an agent for all other lock states. */
837 if (lock->update_advice) {
839 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
840 * since the initial UDS query because of compression, rollover, or because the
841 * query agent didn't have an allocation. The UDS update was delayed in case there
842 * was another change in location, but with only this data_vio using the hash lock,
843 * it's time to update the advice.
845 start_updating(lock, agent);
848 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
849 * location so the hash lock itself can be released (contingent on no new data_vios
850 * arriving in the lock before the agent returns).
852 start_unlocking(lock, agent);
857 * acquire_lock() - Get the lock for a record name.
858 * @zone: The zone responsible for the hash.
859 * @hash: The hash to lock.
860 * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
862 * @lock_ptr: A pointer to receive the hash lock.
864 * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
865 * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
866 * zone. This must only be called in the correct thread for the zone.
868 * Return: VDO_SUCCESS or an error code.
870 static int __must_check acquire_lock(struct hash_zone *zone,
871 const struct uds_record_name *hash,
872 struct hash_lock *replace_lock,
873 struct hash_lock **lock_ptr)
875 struct hash_lock *lock, *new_lock;
879 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
880 * in the common case of no lock contention.
882 result = ASSERT(!list_empty(&zone->lock_pool),
883 "never need to wait for a free hash lock");
884 if (result != VDO_SUCCESS)
887 new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
888 list_del_init(&new_lock->pool_node);
891 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
894 new_lock->hash = *hash;
896 result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
897 new_lock, (replace_lock != NULL), (void **) &lock);
898 if (result != VDO_SUCCESS) {
899 return_hash_lock_to_pool(zone, uds_forget(new_lock));
903 if (replace_lock != NULL) {
904 /* On mismatch put the old lock back and return a severe error */
905 ASSERT_LOG_ONLY(lock == replace_lock,
906 "old lock must have been in the lock map");
907 /* TODO: Check earlier and bail out? */
908 ASSERT_LOG_ONLY(replace_lock->registered,
909 "old lock must have been marked registered");
910 replace_lock->registered = false;
913 if (lock == replace_lock) {
915 lock->registered = true;
917 /* There's already a lock for the hash, so we don't need the borrowed lock. */
918 return_hash_lock_to_pool(zone, uds_forget(new_lock));
926 * enter_forked_lock() - Bind the data_vio to a new hash lock.
928 * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
931 static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
933 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
934 struct hash_lock *new_lock = context;
936 set_hash_lock(data_vio, new_lock);
937 wait_on_hash_lock(new_lock, data_vio);
941 * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
942 * @old_lock: The hash lock to fork.
943 * @new_agent: The data_vio that will be the agent for the new lock.
945 * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
946 * of the old lock in the lock map. The old lock remains active, but will not update advice.
948 static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
950 struct hash_lock *new_lock;
953 result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
955 if (result != VDO_SUCCESS) {
956 continue_data_vio_with_error(new_agent, result);
961 * Only one of the two locks should update UDS. The old lock is out of references, so it
962 * would be poor dedupe advice in the short term.
964 old_lock->update_advice = false;
965 new_lock->update_advice = true;
967 set_hash_lock(new_agent, new_lock);
968 new_lock->agent = new_agent;
970 vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
972 new_agent->is_duplicate = false;
973 start_writing(new_lock, new_agent);
977 * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
979 * @lock: The hash lock.
980 * @data_vio: The data_vio to deduplicate using the hash lock.
981 * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
983 * If no increments are available, this will roll over to a new hash lock and launch the data_vio
984 * as the writing agent for that lock.
986 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
989 if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
990 /* Out of increments, so must roll over to a new lock. */
991 fork_hash_lock(lock, data_vio);
995 /* Deduplicate against the lock's verified location. */
996 set_duplicate_location(data_vio, lock->duplicate);
997 data_vio->new_mapped = data_vio->duplicate;
998 update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
1002 * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
1003 * true copy of their data on disk.
1004 * @lock: The hash lock.
1005 * @agent: The data_vio acting as the agent for the lock.
1006 * @agent_is_done: true only if the agent has already written or deduplicated against its data.
1008 * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1009 * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1011 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1014 lock->state = VDO_HASH_LOCK_DEDUPING;
1017 * We don't take the downgraded allocation lock from the agent unless we actually need to
1018 * deduplicate against it.
1020 if (lock->duplicate_lock == NULL) {
1021 ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1022 "compression must have shared a lock");
1023 ASSERT_LOG_ONLY(agent_is_done,
1024 "agent must have written the new duplicate");
1025 transfer_allocation_lock(agent);
1028 ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1029 "duplicate_lock must be a PBN read lock");
1032 * This state is not like any of the other states. There is no designated agent--the agent
1033 * transitioning to this state and all the waiters will be launched to deduplicate in
1039 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1040 * available increments for on the dedupe path. If we run out of increments, rollover will
1041 * be triggered and the remaining waiters will be transferred to the new lock.
1043 if (!agent_is_done) {
1044 launch_dedupe(lock, agent, true);
1047 while (vdo_waitq_has_waiters(&lock->waiters))
1048 launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1050 if (agent_is_done) {
1052 * In the degenerate case where all the waiters rolled over to a new lock, this
1053 * will continue to use the old agent to clean up this lock, and otherwise it just
1054 * lets the agent exit the lock.
1056 finish_deduping(lock, agent);
1061 * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1062 * @stat: The statistic field to increment.
1064 static void increment_stat(u64 *stat)
1067 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1068 * affecting other threads reading stats.
1070 WRITE_ONCE(*stat, *stat + 1);
1074 * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1075 * duplicate candidate.
1076 * @completion: The completion of the data_vio used to verify dedupe
1078 * This continuation is registered in start_verifying().
1080 static void finish_verifying(struct vdo_completion *completion)
1082 struct data_vio *agent = as_data_vio(completion);
1083 struct hash_lock *lock = agent->hash_lock;
1085 assert_hash_lock_agent(agent, __func__);
1087 lock->verified = agent->is_duplicate;
1090 * Only count the result of the initial verification of the advice as valid or stale, and
1091 * not any re-verifications due to PBN lock releases.
1093 if (!lock->verify_counted) {
1094 lock->verify_counted = true;
1096 increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1098 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1102 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1103 * claim a reference count increment for the agent.
1105 if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1106 agent->is_duplicate = false;
1107 lock->verified = false;
1110 if (lock->verified) {
1112 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1113 * deduplicating against it, if references are available.
1115 start_deduping(lock, agent, false);
1118 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1119 * dedupe and roll over immediately, which would fail because it would leave the
1120 * lock without an agent to release the PBN lock. In both cases, the data will have
1121 * to be written or compressed, but first the advice PBN must be unlocked by the
1124 lock->update_advice = true;
1125 start_unlocking(lock, agent);
1129 static bool blocks_equal(char *block1, char *block2)
1134 for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1135 if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1142 static void verify_callback(struct vdo_completion *completion)
1144 struct data_vio *agent = as_data_vio(completion);
1146 agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1147 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1150 static void uncompress_and_verify(struct vdo_completion *completion)
1152 struct data_vio *agent = as_data_vio(completion);
1155 result = uncompress_data_vio(agent, agent->duplicate.state,
1156 agent->scratch_block);
1157 if (result == VDO_SUCCESS) {
1158 verify_callback(completion);
1162 agent->is_duplicate = false;
1163 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1166 static void verify_endio(struct bio *bio)
1168 struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1169 int result = blk_status_to_errno(bio->bi_status);
1171 vdo_count_completed_bios(bio);
1172 if (result != VDO_SUCCESS) {
1173 agent->is_duplicate = false;
1174 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1178 if (vdo_is_state_compressed(agent->duplicate.state)) {
1179 launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1180 CPU_Q_COMPRESS_BLOCK_PRIORITY);
1184 launch_data_vio_cpu_callback(agent, verify_callback,
1185 CPU_Q_COMPLETE_READ_PRIORITY);
1189 * start_verifying() - Begin the data verification phase.
1190 * @lock: The hash lock (must be LOCKING).
1191 * @agent: The data_vio to use to read and compare candidate data.
1193 * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1194 * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1195 * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1196 * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1199 static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1202 struct vio *vio = &agent->vio;
1203 char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1204 (char *) agent->compression.block :
1205 agent->scratch_block);
1207 lock->state = VDO_HASH_LOCK_VERIFYING;
1208 ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1210 agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1211 result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1212 agent->duplicate.pbn);
1213 if (result != VDO_SUCCESS) {
1214 set_data_vio_hash_zone_callback(agent, finish_verifying);
1215 continue_data_vio_with_error(agent, result);
1219 set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1220 vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1224 * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1225 * lock on the candidate duplicate block.
1226 * @completion: The completion of the data_vio that attempted to get the read lock.
1228 * This continuation is registered in lock_duplicate_pbn().
1230 static void finish_locking(struct vdo_completion *completion)
1232 struct data_vio *agent = as_data_vio(completion);
1233 struct hash_lock *lock = agent->hash_lock;
1235 assert_hash_lock_agent(agent, __func__);
1237 if (!agent->is_duplicate) {
1238 ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1239 "must not hold duplicate_lock if not flagged as a duplicate");
1241 * LOCKING -> WRITING transition: The advice block is being modified or has no
1242 * available references, so try to write or compress the data, remembering to
1243 * update UDS later with the new advice.
1245 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1246 lock->update_advice = true;
1247 start_writing(lock, agent);
1251 ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1252 "must hold duplicate_lock if flagged as a duplicate");
1254 if (!lock->verified) {
1256 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1257 * the candidate duplicate and comparing it to the agent's data to decide whether
1258 * it is a true duplicate or stale advice.
1260 start_verifying(lock, agent);
1264 if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1266 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1267 * available increments left. Must first release the useless PBN read lock before
1268 * rolling over to a new copy of the block.
1270 agent->is_duplicate = false;
1271 lock->verified = false;
1272 lock->update_advice = true;
1273 start_unlocking(lock, agent);
1278 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1279 * against a location that was previously verified or written to.
1281 start_deduping(lock, agent, false);
1284 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1285 struct slab_depot *depot)
1287 /* Ensure that the newly-locked block is referenced. */
1288 struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1289 int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1291 if (result == VDO_SUCCESS)
1294 uds_log_warning_strerror(result,
1295 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1296 agent->is_duplicate = false;
1297 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1298 agent->duplicate.pbn, lock);
1299 continue_data_vio_with_error(agent, result);
1304 * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1305 * duplicate data (compressed or uncompressed).
1306 * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1307 * behalf of its hash lock.
1309 * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1310 * cleared before calling back. this continuation is launched from start_locking(), and calls back
1311 * to finish_locking() on the hash zone thread.
1313 static void lock_duplicate_pbn(struct vdo_completion *completion)
1315 unsigned int increment_limit;
1316 struct pbn_lock *lock;
1319 struct data_vio *agent = as_data_vio(completion);
1320 struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1321 struct physical_zone *zone = agent->duplicate.zone;
1323 assert_data_vio_in_duplicate_zone(agent);
1325 set_data_vio_hash_zone_callback(agent, finish_locking);
1328 * While in the zone that owns it, find out how many additional references can be made to
1329 * the block if it turns out to truly be a duplicate.
1331 increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1332 if (increment_limit == 0) {
1334 * We could deduplicate against it later if a reference happened to be released
1335 * during verification, but it's probably better to bail out now.
1337 agent->is_duplicate = false;
1338 continue_data_vio(agent);
1342 result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1343 VIO_READ_LOCK, &lock);
1344 if (result != VDO_SUCCESS) {
1345 continue_data_vio_with_error(agent, result);
1349 if (!vdo_is_pbn_read_lock(lock)) {
1351 * There are three cases of write locks: uncompressed data block writes, compressed
1352 * (packed) block writes, and block map page writes. In all three cases, we give up
1353 * on trying to verify the advice and don't bother to try deduplicate against the
1354 * data in the write lock holder.
1356 * 1) We don't ever want to try to deduplicate against a block map page.
1358 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1359 * because of the chance of matching it, and because we don't record advice for it,
1360 * but for the uncompressed representation of all the fragments it contains. The
1361 * only way we'd be getting lock contention is if we've written the same
1362 * representation coincidentally before, had it become unreferenced, and it just
1363 * happened to be packed together from compressed writes when we go to verify the
1364 * lucky advice. Giving up is a minuscule loss of potential dedupe.
1366 * 2b) If the advice is for a slot of a compressed block, it's about to get
1367 * smashed, and the write smashing it cannot contain our data--it would have to be
1368 * writing on behalf of our hash lock, but that's impossible since we're the lock
1371 * 3a) If the lock is held by a data_vio with different data, the advice is already
1372 * stale or is about to become stale.
1374 * 3b) If the lock is held by a data_vio that matches us, we may as well either
1375 * write it ourselves (or reference the copy we already wrote) instead of
1376 * potentially having many duplicates wait for the lock holder to write, journal,
1377 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1378 * update in the very rare case of advice for a free block that just happened to be
1379 * allocated to a data_vio with the same hash. There's also a chance to save on a
1380 * block write, at the cost of a block verify. Saving on a full block compare in
1381 * all stale advice cases almost certainly outweighs saving a UDS update and
1382 * trading a write for a read in a lucky case where advice would have been saved
1383 * from becoming stale.
1385 agent->is_duplicate = false;
1386 continue_data_vio(agent);
1390 if (lock->holder_count == 0) {
1391 if (!acquire_provisional_reference(agent, lock, depot))
1395 * The increment limit we grabbed earlier is still valid. The lock now holds the
1396 * rights to acquire all those references. Those rights will be claimed by hash
1397 * locks sharing this read lock.
1399 lock->increment_limit = increment_limit;
1403 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1405 set_duplicate_lock(agent->hash_lock, lock);
1408 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1411 continue_data_vio(agent);
1415 * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1416 * potential duplicate through its agent.
1417 * @lock: The hash lock (currently must be QUERYING).
1418 * @agent: The data_vio bearing the dedupe advice.
1420 static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1422 ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1423 "must not acquire a duplicate lock when already holding it");
1425 lock->state = VDO_HASH_LOCK_LOCKING;
1428 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1429 * accepting the advice, and don't explicitly change lock states (or use an agent-local
1430 * state, or an atomic), we can avoid a thread transition here.
1432 agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1433 launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1437 * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1438 * compressing its copy of the data block.
1439 * @lock: The hash lock, which must be in state WRITING.
1440 * @agent: The data_vio that wrote its data for the lock.
1442 * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1443 * may not be finished with it, as a UDS update might still be needed.
1445 * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1446 * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1447 * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1450 static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1453 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1454 * the write succeeded, there's no need to verify it.
1456 lock->duplicate = agent->new_mapped;
1457 lock->verified = true;
1459 if (vdo_is_state_compressed(lock->duplicate.state) &&
1462 * Compression means the location we gave in the UDS query is not the location
1463 * we're using to deduplicate.
1465 lock->update_advice = true;
1467 /* If there are any waiters, we need to start deduping them. */
1468 if (vdo_waitq_has_waiters(&lock->waiters)) {
1470 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1471 * compress, so the PBN lock on the written copy was already transferred. The agent
1472 * is done with the lock, but the lock may still need to use it to clean up after
1475 start_deduping(lock, agent, true);
1480 * There are no waiters and the agent has successfully written, so take a step towards
1481 * being able to release the hash lock (or just release it).
1483 if (lock->update_advice) {
1485 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1486 * retain the WRITING agent and use it to launch the update. The happens on
1487 * compression, rollover, or the QUERYING agent not having an allocation.
1489 start_updating(lock, agent);
1490 } else if (lock->duplicate_lock != NULL) {
1492 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1493 * compressed write gave us a shared duplicate lock that we must release.
1495 set_duplicate_location(agent, lock->duplicate);
1496 start_unlocking(lock, agent);
1499 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1500 * duplicate lock held, so both the agent and lock have no more work to do. The
1501 * agent will release its allocation lock in cleanup.
1503 start_bypassing(lock, agent);
1508 * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1509 * @lock: The hash lock to modify.
1511 * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1512 * return the new agent. Otherwise, just return the current agent.
1514 static struct data_vio *select_writing_agent(struct hash_lock *lock)
1516 struct vdo_wait_queue temp_queue;
1517 struct data_vio *data_vio;
1519 vdo_waitq_init(&temp_queue);
1522 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1523 * search, but it only happens when nearly out of space.
1525 while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1526 !data_vio_has_allocation(data_vio)) {
1527 /* Use the lower-level enqueue since we're just moving waiters around. */
1528 vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1531 if (data_vio != NULL) {
1533 * Move the rest of the waiters over to the temp queue, preserving the order they
1534 * arrived at the lock.
1536 vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1539 * The current agent is being replaced and will have to wait to dedupe; make it the
1540 * first waiter since it was the first to reach the lock.
1542 vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1543 lock->agent = data_vio;
1545 /* No one has an allocation, so keep the current agent. */
1546 data_vio = lock->agent;
1549 /* Swap all the waiters back onto the lock's queue. */
1550 vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1555 * start_writing() - Begin the non-duplicate write path.
1556 * @lock: The hash lock (currently must be QUERYING).
1557 * @agent: The data_vio currently acting as the agent for the lock.
1559 * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1560 * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1563 static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1565 lock->state = VDO_HASH_LOCK_WRITING;
1568 * The agent might not have received an allocation and so can't be used for writing, but
1569 * it's entirely possible that one of the waiters did.
1571 if (!data_vio_has_allocation(agent)) {
1572 agent = select_writing_agent(lock);
1573 /* If none of the waiters had an allocation, the writes all have to fail. */
1574 if (!data_vio_has_allocation(agent)) {
1576 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1577 * fail immediately if they don't have an allocation? It might be possible
1578 * that on some path there would be non-waiters still referencing the lock,
1579 * so it would remain in the map as everything is currently spelled, even
1580 * if the agent and all waiters release.
1582 continue_data_vio_with_error(agent, VDO_NO_SPACE);
1588 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1589 * there are any other data_vios waiting.
1591 if (vdo_waitq_has_waiters(&lock->waiters))
1592 cancel_data_vio_compression(agent);
1595 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1596 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1598 launch_compress_data_vio(agent);
1602 * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1603 * Returns true if valid advice was found and decoded
1605 static bool decode_uds_advice(struct dedupe_context *context)
1607 const struct uds_request *request = &context->request;
1608 struct data_vio *data_vio = context->requestor;
1610 const struct uds_record_data *encoding = &request->old_metadata;
1611 struct vdo *vdo = vdo_from_data_vio(data_vio);
1612 struct zoned_pbn *advice = &data_vio->duplicate;
1616 if ((request->status != UDS_SUCCESS) || !request->found)
1619 version = encoding->data[offset++];
1620 if (version != UDS_ADVICE_VERSION) {
1621 uds_log_error("invalid UDS advice version code %u", version);
1625 advice->state = encoding->data[offset++];
1626 advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1627 offset += sizeof(u64);
1628 BUG_ON(offset != UDS_ADVICE_SIZE);
1630 /* Don't use advice that's clearly meaningless. */
1631 if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1632 uds_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1633 (unsigned long long) advice->pbn, advice->state,
1634 (unsigned long long) data_vio->logical.lbn);
1635 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1639 result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1640 if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1641 uds_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1642 (unsigned long long) advice->pbn,
1643 (unsigned long long) data_vio->logical.lbn);
1644 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1651 static void process_query_result(struct data_vio *agent)
1653 struct dedupe_context *context = agent->dedupe_context;
1655 if (context == NULL)
1658 if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1659 agent->is_duplicate = decode_uds_advice(context);
1660 release_context(context);
1665 * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1666 * @completion: The completion of the data_vio that performed the query.
1668 * This continuation is registered in start_querying().
1670 static void finish_querying(struct vdo_completion *completion)
1672 struct data_vio *agent = as_data_vio(completion);
1673 struct hash_lock *lock = agent->hash_lock;
1675 assert_hash_lock_agent(agent, __func__);
1677 process_query_result(agent);
1679 if (agent->is_duplicate) {
1680 lock->duplicate = agent->duplicate;
1682 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1683 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1684 * that the advice can be used.
1686 start_locking(lock, agent);
1689 * The agent will be used as the duplicate if has an allocation; if it does, that
1690 * location was posted to UDS, so no update will be needed.
1692 lock->update_advice = !data_vio_has_allocation(agent);
1694 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1695 * so try to write or compress the data.
1697 start_writing(lock, agent);
1702 * start_querying() - Start deduplication for a hash lock.
1703 * @lock: The initialized hash lock.
1704 * @data_vio: The data_vio that has just obtained the new lock.
1706 * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1707 * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1708 * query on behalf of the lock.
1710 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1712 lock->agent = data_vio;
1713 lock->state = VDO_HASH_LOCK_QUERYING;
1714 data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1715 set_data_vio_hash_zone_callback(data_vio, finish_querying);
1716 query_index(data_vio,
1717 (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1721 * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1722 * unimplemented or unusable state and continue the data_vio with an
1724 * @lock: The hash lock.
1725 * @data_vio: The data_vio attempting to enter the lock.
1727 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1729 ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1730 get_hash_lock_state_name(lock->state));
1731 continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1735 * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1737 * @data_vio: The data_vio to continue processing in its hash lock.
1739 * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1740 * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1741 * lock, or update the UDS index, or simply release its share of the lock.
1743 * Context: This must only be called in the correct thread for the hash zone.
1745 void vdo_continue_hash_lock(struct vdo_completion *completion)
1747 struct data_vio *data_vio = as_data_vio(completion);
1748 struct hash_lock *lock = data_vio->hash_lock;
1750 switch (lock->state) {
1751 case VDO_HASH_LOCK_WRITING:
1752 ASSERT_LOG_ONLY(data_vio == lock->agent,
1753 "only the lock agent may continue the lock");
1754 finish_writing(lock, data_vio);
1757 case VDO_HASH_LOCK_DEDUPING:
1758 finish_deduping(lock, data_vio);
1761 case VDO_HASH_LOCK_BYPASSING:
1762 /* This data_vio has finished the write path and the lock doesn't need it. */
1763 exit_hash_lock(data_vio);
1766 case VDO_HASH_LOCK_INITIALIZING:
1767 case VDO_HASH_LOCK_QUERYING:
1768 case VDO_HASH_LOCK_UPDATING:
1769 case VDO_HASH_LOCK_LOCKING:
1770 case VDO_HASH_LOCK_VERIFYING:
1771 case VDO_HASH_LOCK_UNLOCKING:
1772 /* A lock in this state should never be re-entered. */
1773 report_bogus_lock_state(lock, data_vio);
1777 report_bogus_lock_state(lock, data_vio);
1782 * is_hash_collision() - Check to see if a hash collision has occurred.
1783 * @lock: The lock to check.
1784 * @candidate: The data_vio seeking to share the lock.
1786 * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1787 * share the lock, which should only be possible in the extremely unlikely case of a hash
1790 * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1791 * as the lock holders.
1793 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1795 struct data_vio *lock_holder;
1796 struct hash_zone *zone;
1799 if (list_empty(&lock->duplicate_ring))
1802 lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
1804 zone = candidate->hash_zone;
1805 collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1807 increment_stat(&zone->statistics.concurrent_hash_collisions);
1809 increment_stat(&zone->statistics.concurrent_data_matches);
1814 static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1818 /* FIXME: BUG_ON() and/or enter read-only mode? */
1819 result = ASSERT(data_vio->hash_lock == NULL,
1820 "must not already hold a hash lock");
1821 if (result != VDO_SUCCESS)
1824 result = ASSERT(list_empty(&data_vio->hash_lock_entry),
1825 "must not already be a member of a hash lock ring");
1826 if (result != VDO_SUCCESS)
1829 return ASSERT(data_vio->recovery_sequence_number == 0,
1830 "must not hold a recovery lock when getting a hash lock");
1834 * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1835 * @data_vio: The data_vio acquiring a lock on its record name.
1837 * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1838 * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1839 * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1842 void vdo_acquire_hash_lock(struct vdo_completion *completion)
1844 struct data_vio *data_vio = as_data_vio(completion);
1845 struct hash_lock *lock;
1848 assert_data_vio_in_hash_zone(data_vio);
1850 result = assert_hash_lock_preconditions(data_vio);
1851 if (result != VDO_SUCCESS) {
1852 continue_data_vio_with_error(data_vio, result);
1856 result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1857 if (result != VDO_SUCCESS) {
1858 continue_data_vio_with_error(data_vio, result);
1862 if (is_hash_collision(lock, data_vio)) {
1864 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1865 * corruption. Bypass optimization entirely. We can't compress a data_vio without
1866 * a hash_lock as the compressed write depends on the hash_lock to manage the
1867 * references for the compressed block.
1869 write_data_vio(data_vio);
1873 set_hash_lock(data_vio, lock);
1874 switch (lock->state) {
1875 case VDO_HASH_LOCK_INITIALIZING:
1876 start_querying(lock, data_vio);
1879 case VDO_HASH_LOCK_QUERYING:
1880 case VDO_HASH_LOCK_WRITING:
1881 case VDO_HASH_LOCK_UPDATING:
1882 case VDO_HASH_LOCK_LOCKING:
1883 case VDO_HASH_LOCK_VERIFYING:
1884 case VDO_HASH_LOCK_UNLOCKING:
1885 /* The lock is busy, and can't be shared yet. */
1886 wait_on_hash_lock(lock, data_vio);
1889 case VDO_HASH_LOCK_BYPASSING:
1890 /* We can't use this lock, so bypass optimization entirely. */
1891 vdo_release_hash_lock(data_vio);
1892 write_data_vio(data_vio);
1895 case VDO_HASH_LOCK_DEDUPING:
1896 launch_dedupe(lock, data_vio, false);
1900 /* A lock in this state should not be acquired by new VIOs. */
1901 report_bogus_lock_state(lock, data_vio);
1906 * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1907 * data_vio's reference to it.
1908 * @data_vio: The data_vio releasing its hash lock.
1910 * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1911 * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1912 * returns the lock to the hash zone's lock pool.
1914 * Context: This must only be called in the correct thread for the hash zone.
1916 void vdo_release_hash_lock(struct data_vio *data_vio)
1919 struct hash_lock *lock = data_vio->hash_lock;
1920 struct hash_zone *zone = data_vio->hash_zone;
1925 set_hash_lock(data_vio, NULL);
1927 if (lock->reference_count > 0) {
1928 /* The lock is still in use by other data_vios. */
1932 lock_key = hash_lock_key(lock);
1933 if (lock->registered) {
1934 struct hash_lock *removed;
1936 removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1937 ASSERT_LOG_ONLY(lock == removed,
1938 "hash lock being released must have been mapped");
1940 ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1941 "unregistered hash lock must not be in the lock map");
1944 ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1945 "hash lock returned to zone must have no waiters");
1946 ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1947 "hash lock returned to zone must not reference a PBN lock");
1948 ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1949 "returned hash lock must not be in use with state %s",
1950 get_hash_lock_state_name(lock->state));
1951 ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1952 "hash lock returned to zone must not be in a pool ring");
1953 ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
1954 "hash lock returned to zone must not reference DataVIOs");
1956 return_hash_lock_to_pool(zone, lock);
1960 * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1961 * data_vio's hash lock, converting it to a duplicate PBN lock.
1962 * @data_vio: The data_vio holding the allocation lock to transfer.
1964 static void transfer_allocation_lock(struct data_vio *data_vio)
1966 struct allocation *allocation = &data_vio->allocation;
1967 struct hash_lock *hash_lock = data_vio->hash_lock;
1969 ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1970 "transferred lock must be for the block written");
1972 allocation->pbn = VDO_ZERO_BLOCK;
1974 ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1975 "must have downgraded the allocation lock before transfer");
1977 hash_lock->duplicate = data_vio->new_mapped;
1978 data_vio->duplicate = data_vio->new_mapped;
1981 * Since the lock is being transferred, the holder count doesn't change (and isn't even
1982 * safe to examine on this thread).
1984 hash_lock->duplicate_lock = uds_forget(allocation->lock);
1988 * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1989 * on the compressed block to which its data was just written.
1990 * @data_vio: The data_vio which was just compressed.
1991 * @pbn_lock: The PBN lock on the compressed block.
1993 * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1994 * read lock. This also reserves a reference count increment for the data_vio.
1996 void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1997 struct pbn_lock *pbn_lock)
2001 ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
2002 "a duplicate PBN lock should not exist when writing");
2003 ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
2004 "lock transfer must be for a compressed write");
2005 assert_data_vio_in_new_mapped_zone(data_vio);
2007 /* First sharer downgrades the lock. */
2008 if (!vdo_is_pbn_read_lock(pbn_lock))
2009 vdo_downgrade_pbn_write_lock(pbn_lock, true);
2012 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2013 * has had a chance to journal a reference.
2015 data_vio->duplicate = data_vio->new_mapped;
2016 data_vio->hash_lock->duplicate = data_vio->new_mapped;
2017 set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2020 * Claim a reference for this data_vio. Necessary since another hash_lock might start
2021 * deduplicating against it before our incRef.
2023 claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2024 ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2027 static void dedupe_kobj_release(struct kobject *directory)
2029 uds_free(container_of(directory, struct hash_zones, dedupe_directory));
2032 static ssize_t dedupe_status_show(struct kobject *directory, struct attribute *attr,
2035 struct uds_attribute *ua = container_of(attr, struct uds_attribute, attr);
2036 struct hash_zones *zones = container_of(directory, struct hash_zones,
2039 if (ua->show_string != NULL)
2040 return sprintf(buf, "%s\n", ua->show_string(zones));
2045 static ssize_t dedupe_status_store(struct kobject *kobj __always_unused,
2046 struct attribute *attr __always_unused,
2047 const char *buf __always_unused,
2048 size_t length __always_unused)
2053 /*----------------------------------------------------------------------*/
2055 static const struct sysfs_ops dedupe_sysfs_ops = {
2056 .show = dedupe_status_show,
2057 .store = dedupe_status_store,
2060 static struct uds_attribute dedupe_status_attribute = {
2061 .attr = {.name = "status", .mode = 0444, },
2062 .show_string = vdo_get_dedupe_index_state_name,
2065 static struct attribute *dedupe_attrs[] = {
2066 &dedupe_status_attribute.attr,
2069 ATTRIBUTE_GROUPS(dedupe);
2071 static const struct kobj_type dedupe_directory_type = {
2072 .release = dedupe_kobj_release,
2073 .sysfs_ops = &dedupe_sysfs_ops,
2074 .default_groups = dedupe_groups,
2077 static void start_uds_queue(void *ptr)
2080 * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2081 * during the UDS calls that open or close an index, but those allocations can safely sleep
2082 * while reserving a large amount of memory. We could use an allocations_allowed boolean
2083 * (like the base threads do), but it would be an unnecessary embellishment.
2085 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2087 uds_register_allocating_thread(&thread->allocating_thread, NULL);
2090 static void finish_uds_queue(void *ptr __always_unused)
2092 uds_unregister_allocating_thread();
2095 static void close_index(struct hash_zones *zones)
2100 * Change the index state so that get_index_statistics() will not try to use the index
2101 * session we are closing.
2103 zones->index_state = IS_CHANGING;
2104 /* Close the index session, while not holding the lock. */
2105 spin_unlock(&zones->lock);
2106 result = uds_close_index(zones->index_session);
2108 if (result != UDS_SUCCESS)
2109 uds_log_error_strerror(result, "Error closing index");
2110 spin_lock(&zones->lock);
2111 zones->index_state = IS_CLOSED;
2112 zones->error_flag |= result != UDS_SUCCESS;
2113 /* ASSERTION: We leave in IS_CLOSED state. */
2116 static void open_index(struct hash_zones *zones)
2118 /* ASSERTION: We enter in IS_CLOSED state. */
2120 bool create_flag = zones->create_flag;
2122 zones->create_flag = false;
2124 * Change the index state so that the it will be reported to the outside world as
2127 zones->index_state = IS_CHANGING;
2128 zones->error_flag = false;
2130 /* Open the index session, while not holding the lock */
2131 spin_unlock(&zones->lock);
2132 result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2133 &zones->parameters, zones->index_session);
2134 if (result != UDS_SUCCESS)
2135 uds_log_error_strerror(result, "Error opening index");
2137 spin_lock(&zones->lock);
2142 * Either there is no index, or there is no way we can recover the index.
2143 * We will be called again and try to create a new index.
2145 zones->index_state = IS_CLOSED;
2146 zones->create_flag = true;
2152 if (result == UDS_SUCCESS) {
2153 zones->index_state = IS_OPENED;
2155 zones->index_state = IS_CLOSED;
2156 zones->index_target = IS_CLOSED;
2157 zones->error_flag = true;
2158 spin_unlock(&zones->lock);
2159 uds_log_info("Setting UDS index target state to error");
2160 spin_lock(&zones->lock);
2163 * ASSERTION: On success, we leave in IS_OPENED state.
2164 * ASSERTION: On failure, we leave in IS_CLOSED state.
2168 static void change_dedupe_state(struct vdo_completion *completion)
2170 struct hash_zones *zones = as_hash_zones(completion);
2172 spin_lock(&zones->lock);
2174 /* Loop until the index is in the target state and the create flag is clear. */
2175 while (vdo_is_state_normal(&zones->state) &&
2176 ((zones->index_state != zones->index_target) || zones->create_flag)) {
2177 if (zones->index_state == IS_OPENED)
2183 zones->changing = false;
2184 spin_unlock(&zones->lock);
2187 static void start_expiration_timer(struct dedupe_context *context)
2189 u64 start_time = context->submission_jiffies;
2192 if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2193 DEDUPE_QUERY_TIMER_RUNNING))
2196 end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2197 jiffies + vdo_dedupe_index_min_timer_jiffies);
2198 mod_timer(&context->zone->timer, end_time);
2202 * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2203 * expiration time without getting answers, so we timed them out.
2204 * @zones: the hash zones.
2205 * @timeouts: the number of newly timed out requests.
2207 static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2209 atomic64_add(timeouts, &zones->timeouts);
2210 spin_lock(&zones->lock);
2211 if (__ratelimit(&zones->ratelimiter)) {
2212 u64 unreported = atomic64_read(&zones->timeouts);
2214 unreported -= zones->reported_timeouts;
2215 uds_log_debug("UDS index timeout on %llu requests",
2216 (unsigned long long) unreported);
2217 zones->reported_timeouts += unreported;
2219 spin_unlock(&zones->lock);
2222 static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2226 struct volume_geometry geometry = vdo->geometry;
2227 static const struct vdo_work_queue_type uds_queue_type = {
2228 .start = start_uds_queue,
2229 .finish = finish_uds_queue,
2230 .max_priority = UDS_Q_MAX_PRIORITY,
2231 .default_priority = UDS_Q_PRIORITY,
2234 vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2235 vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2238 * Since we will save up the timeouts that would have been reported but were ratelimited,
2239 * we don't need to report ratelimiting.
2241 ratelimit_default_init(&zones->ratelimiter);
2242 ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2243 uds_offset = ((vdo_get_index_region_start(geometry) -
2244 geometry.bio_offset) * VDO_BLOCK_SIZE);
2245 zones->parameters = (struct uds_parameters) {
2246 .bdev = vdo->device_config->owned_device->bdev,
2247 .offset = uds_offset,
2248 .size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2249 .memory_size = geometry.index_config.mem,
2250 .sparse = geometry.index_config.sparse,
2251 .nonce = (u64) geometry.nonce,
2254 result = uds_create_index_session(&zones->index_session);
2255 if (result != UDS_SUCCESS)
2258 result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2260 if (result != VDO_SUCCESS) {
2261 uds_destroy_index_session(uds_forget(zones->index_session));
2262 uds_log_error("UDS index queue initialization failed (%d)", result);
2266 vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2267 vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2268 vdo->thread_config.dedupe_thread);
2269 kobject_init(&zones->dedupe_directory, &dedupe_directory_type);
2274 * finish_index_operation() - This is the UDS callback for index queries.
2275 * @request: The uds request which has just completed.
2277 static void finish_index_operation(struct uds_request *request)
2279 struct dedupe_context *context = container_of(request, struct dedupe_context,
2282 if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2283 DEDUPE_CONTEXT_COMPLETE)) {
2285 * This query has not timed out, so send its data_vio back to its hash zone to
2286 * process the results.
2288 continue_data_vio(context->requestor);
2293 * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2294 * data_vio has already moved on.
2296 if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2297 DEDUPE_CONTEXT_TIMED_OUT_COMPLETE))
2298 ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2299 atomic_read(&context->state));
2301 uds_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2305 * check_for_drain_complete() - Check whether this zone has drained.
2306 * @zone: The zone to check.
2308 static void check_for_drain_complete(struct hash_zone *zone)
2310 data_vio_count_t recycled = 0;
2312 if (!vdo_is_state_draining(&zone->state))
2315 if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2316 change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2317 DEDUPE_QUERY_TIMER_IDLE)) {
2318 del_timer_sync(&zone->timer);
2321 * There is an in flight time-out, which must get processed before we can continue.
2327 struct dedupe_context *context;
2328 struct funnel_queue_entry *entry;
2330 entry = uds_funnel_queue_poll(zone->timed_out_complete);
2334 context = container_of(entry, struct dedupe_context, queue_entry);
2335 atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2336 list_add(&context->list_entry, &zone->available);
2341 WRITE_ONCE(zone->active, zone->active - recycled);
2342 ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2343 vdo_finish_draining(&zone->state);
2346 static void timeout_index_operations_callback(struct vdo_completion *completion)
2348 struct dedupe_context *context, *tmp;
2349 struct hash_zone *zone = as_hash_zone(completion);
2350 u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2351 unsigned long cutoff = jiffies - timeout_jiffies;
2352 unsigned int timed_out = 0;
2354 atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2355 list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2356 if (cutoff <= context->submission_jiffies) {
2358 * We have reached the oldest query which has not timed out yet, so restart
2361 start_expiration_timer(context);
2365 if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2366 DEDUPE_CONTEXT_TIMED_OUT)) {
2368 * This context completed between the time the timeout fired, and now. We
2369 * can treat it as a a successful query, its requestor is already enqueued
2376 * Remove this context from the pending list so we won't look at it again on a
2377 * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2378 * send its requestor on its way.
2380 list_del_init(&context->list_entry);
2381 continue_data_vio(context->requestor);
2386 report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2388 check_for_drain_complete(zone);
2391 static void timeout_index_operations(struct timer_list *t)
2393 struct hash_zone *zone = from_timer(zone, t, timer);
2395 if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2396 DEDUPE_QUERY_TIMER_FIRED))
2397 vdo_launch_completion(&zone->completion);
2400 static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2401 zone_count_t zone_number)
2405 struct hash_zone *zone = &zones->zones[zone_number];
2407 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2408 if (result != VDO_SUCCESS)
2411 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2412 zone->zone_number = zone_number;
2413 zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2414 vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2415 vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2417 INIT_LIST_HEAD(&zone->lock_pool);
2418 result = uds_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2420 if (result != VDO_SUCCESS)
2423 for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2424 return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2426 INIT_LIST_HEAD(&zone->available);
2427 INIT_LIST_HEAD(&zone->pending);
2428 result = uds_make_funnel_queue(&zone->timed_out_complete);
2429 if (result != VDO_SUCCESS)
2432 timer_setup(&zone->timer, timeout_index_operations, 0);
2434 for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2435 struct dedupe_context *context = &zone->contexts[i];
2437 context->zone = zone;
2438 context->request.callback = finish_index_operation;
2439 context->request.session = zones->index_session;
2440 list_add(&context->list_entry, &zone->available);
2443 return vdo_make_default_thread(vdo, zone->thread_id);
2446 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
2447 static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2449 struct hash_zones *zones = context;
2451 return zones->zones[zone_number].thread_id;
2455 * vdo_make_hash_zones() - Create the hash zones.
2457 * @vdo: The vdo to which the zone will belong.
2458 * @zones_ptr: A pointer to hold the zones.
2460 * Return: VDO_SUCCESS or an error code.
2462 int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2465 struct hash_zones *zones;
2467 zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2469 if (zone_count == 0)
2472 result = uds_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2474 if (result != VDO_SUCCESS)
2477 result = initialize_index(vdo, zones);
2478 if (result != VDO_SUCCESS) {
2483 vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2485 zones->zone_count = zone_count;
2486 for (z = 0; z < zone_count; z++) {
2487 result = initialize_zone(vdo, zones, z);
2488 if (result != VDO_SUCCESS) {
2489 vdo_free_hash_zones(zones);
2494 result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2495 vdo->thread_config.admin_thread, zones, NULL,
2496 vdo, &zones->manager);
2497 if (result != VDO_SUCCESS) {
2498 vdo_free_hash_zones(zones);
2506 void vdo_finish_dedupe_index(struct hash_zones *zones)
2511 uds_destroy_index_session(uds_forget(zones->index_session));
2515 * vdo_free_hash_zones() - Free the hash zones.
2516 * @zones: The zone to free.
2518 void vdo_free_hash_zones(struct hash_zones *zones)
2525 uds_free(uds_forget(zones->manager));
2527 for (i = 0; i < zones->zone_count; i++) {
2528 struct hash_zone *zone = &zones->zones[i];
2530 uds_free_funnel_queue(uds_forget(zone->timed_out_complete));
2531 vdo_int_map_free(uds_forget(zone->hash_lock_map));
2532 uds_free(uds_forget(zone->lock_array));
2535 if (zones->index_session != NULL)
2536 vdo_finish_dedupe_index(zones);
2538 ratelimit_state_exit(&zones->ratelimiter);
2539 if (vdo_get_admin_state_code(&zones->state) == VDO_ADMIN_STATE_NEW)
2542 kobject_put(&zones->dedupe_directory);
2545 static void initiate_suspend_index(struct admin_state *state)
2547 struct hash_zones *zones = container_of(state, struct hash_zones, state);
2548 enum index_state index_state;
2550 spin_lock(&zones->lock);
2551 index_state = zones->index_state;
2552 spin_unlock(&zones->lock);
2554 if (index_state != IS_CLOSED) {
2555 bool save = vdo_is_state_saving(&zones->state);
2558 result = uds_suspend_index_session(zones->index_session, save);
2559 if (result != UDS_SUCCESS)
2560 uds_log_error_strerror(result, "Error suspending dedupe index");
2563 vdo_finish_draining(state);
2567 * suspend_index() - Suspend the UDS index prior to draining hash zones.
2569 * Implements vdo_action_preamble_fn
2571 static void suspend_index(void *context, struct vdo_completion *completion)
2573 struct hash_zones *zones = context;
2575 vdo_start_draining(&zones->state,
2576 vdo_get_current_manager_operation(zones->manager), completion,
2577 initiate_suspend_index);
2581 * initiate_drain() - Initiate a drain.
2583 * Implements vdo_admin_initiator_fn.
2585 static void initiate_drain(struct admin_state *state)
2587 check_for_drain_complete(container_of(state, struct hash_zone, state));
2591 * drain_hash_zone() - Drain a hash zone.
2593 * Implements vdo_zone_action_fn.
2595 static void drain_hash_zone(void *context, zone_count_t zone_number,
2596 struct vdo_completion *parent)
2598 struct hash_zones *zones = context;
2600 vdo_start_draining(&zones->zones[zone_number].state,
2601 vdo_get_current_manager_operation(zones->manager), parent,
2605 /** vdo_drain_hash_zones() - Drain all hash zones. */
2606 void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2608 vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2609 drain_hash_zone, NULL, parent);
2612 static void launch_dedupe_state_change(struct hash_zones *zones)
2614 /* ASSERTION: We enter with the lock held. */
2615 if (zones->changing || !vdo_is_state_normal(&zones->state))
2616 /* Either a change is already in progress, or changes are not allowed. */
2619 if (zones->create_flag || (zones->index_state != zones->index_target)) {
2620 zones->changing = true;
2621 vdo_launch_completion(&zones->completion);
2625 /* ASSERTION: We exit with the lock held. */
2629 * resume_index() - Resume the UDS index prior to resuming hash zones.
2631 * Implements vdo_action_preamble_fn
2633 static void resume_index(void *context, struct vdo_completion *parent)
2635 struct hash_zones *zones = context;
2636 struct device_config *config = parent->vdo->device_config;
2639 zones->parameters.bdev = config->owned_device->bdev;
2640 result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2641 if (result != UDS_SUCCESS)
2642 uds_log_error_strerror(result, "Error resuming dedupe index");
2644 spin_lock(&zones->lock);
2645 vdo_resume_if_quiescent(&zones->state);
2647 if (config->deduplication) {
2648 zones->index_target = IS_OPENED;
2649 WRITE_ONCE(zones->dedupe_flag, true);
2651 zones->index_target = IS_CLOSED;
2654 launch_dedupe_state_change(zones);
2655 spin_unlock(&zones->lock);
2657 vdo_finish_completion(parent);
2661 * resume_hash_zone() - Resume a hash zone.
2663 * Implements vdo_zone_action_fn.
2665 static void resume_hash_zone(void *context, zone_count_t zone_number,
2666 struct vdo_completion *parent)
2668 struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2670 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2674 * vdo_resume_hash_zones() - Resume a set of hash zones.
2675 * @zones: The hash zones to resume.
2676 * @parent: The object to notify when the zones have resumed.
2678 void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2680 if (vdo_is_read_only(parent->vdo)) {
2681 vdo_launch_completion(parent);
2685 vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2686 resume_hash_zone, NULL, parent);
2690 * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2691 * @zone: The hash zone to query.
2694 static void get_hash_zone_statistics(const struct hash_zone *zone,
2695 struct hash_lock_statistics *tally)
2697 const struct hash_lock_statistics *stats = &zone->statistics;
2699 tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2700 tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2701 tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2702 tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2703 tally->curr_dedupe_queries += READ_ONCE(zone->active);
2706 static void get_index_statistics(struct hash_zones *zones,
2707 struct index_statistics *stats)
2709 enum index_state state;
2710 struct uds_index_stats index_stats;
2713 spin_lock(&zones->lock);
2714 state = zones->index_state;
2715 spin_unlock(&zones->lock);
2717 if (state != IS_OPENED)
2720 result = uds_get_index_session_stats(zones->index_session, &index_stats);
2721 if (result != UDS_SUCCESS) {
2722 uds_log_error_strerror(result, "Error reading index stats");
2726 stats->entries_indexed = index_stats.entries_indexed;
2727 stats->posts_found = index_stats.posts_found;
2728 stats->posts_not_found = index_stats.posts_not_found;
2729 stats->queries_found = index_stats.queries_found;
2730 stats->queries_not_found = index_stats.queries_not_found;
2731 stats->updates_found = index_stats.updates_found;
2732 stats->updates_not_found = index_stats.updates_not_found;
2733 stats->entries_discarded = index_stats.entries_discarded;
2737 * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2738 * @hash_zones: The hash zones to query
2740 * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2743 void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2748 for (zone = 0; zone < zones->zone_count; zone++)
2749 get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2751 get_index_statistics(zones, &stats->index);
2754 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2755 * of queries not made because of earlier timeouts.
2757 stats->dedupe_advice_timeouts =
2758 (atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2762 * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2763 * @zones: The hash_zones from which to select.
2764 * @name: The record name.
2766 * Return: The hash zone responsible for the record name.
2768 struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2769 const struct uds_record_name *name)
2772 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2773 * since the number of hash zones is small.
2774 * TODO: Verify that the first byte is independent enough.
2776 u32 hash = name->name[0];
2779 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2780 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2781 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2782 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2784 hash = (hash * zones->zone_count) >> 8;
2785 return &zones->zones[hash];
2789 * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2791 * @lock: The hash lock to dump.
2793 static void dump_hash_lock(const struct hash_lock *lock)
2797 if (!list_empty(&lock->pool_node)) {
2798 /* This lock is on the free list. */
2803 * Necessarily cryptic since we can log a lot of these. First three chars of state is
2804 * unambiguous. 'U' indicates a lock not registered in the map.
2806 state = get_hash_lock_state_name(lock->state);
2807 uds_log_info(" hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2808 lock, state, (lock->registered ? 'D' : 'U'),
2809 (unsigned long long) lock->duplicate.pbn,
2810 lock->duplicate.state, lock->reference_count,
2811 vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2814 static const char *index_state_to_string(struct hash_zones *zones,
2815 enum index_state state)
2817 if (!vdo_is_state_normal(&zones->state))
2822 return zones->error_flag ? ERROR : CLOSED;
2824 return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2826 return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2833 * vdo_dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2834 * @zone: The zone to dump.
2836 static void dump_hash_zone(const struct hash_zone *zone)
2840 if (zone->hash_lock_map == NULL) {
2841 uds_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2845 uds_log_info("struct hash_zone %u: mapSize=%zu",
2846 zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2847 for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2848 dump_hash_lock(&zone->lock_array[i]);
2852 * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2853 * @zones: The zones to dump.
2855 void vdo_dump_hash_zones(struct hash_zones *zones)
2857 const char *state, *target;
2860 spin_lock(&zones->lock);
2861 state = index_state_to_string(zones, zones->index_state);
2862 target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2863 spin_unlock(&zones->lock);
2865 uds_log_info("UDS index: state: %s", state);
2867 uds_log_info("UDS index: changing to state: %s", target);
2869 for (zone = 0; zone < zones->zone_count; zone++)
2870 dump_hash_zone(&zones->zones[zone]);
2873 void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2877 /* Arbitrary maximum value is two minutes */
2880 /* Arbitrary minimum value is 2 jiffies */
2881 alb_jiffies = msecs_to_jiffies(value);
2883 if (alb_jiffies < 2) {
2885 value = jiffies_to_msecs(alb_jiffies);
2887 vdo_dedupe_index_timeout_interval = value;
2888 vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2891 void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2895 /* Arbitrary maximum value is one second */
2899 /* Arbitrary minimum value is 2 jiffies */
2900 min_jiffies = msecs_to_jiffies(value);
2902 if (min_jiffies < 2) {
2904 value = jiffies_to_msecs(min_jiffies);
2907 vdo_dedupe_index_min_timer_interval = value;
2908 vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2912 * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2913 * @zone: the hash zone
2915 * Return: A dedupe_context or NULL if none are available
2917 static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2919 struct dedupe_context *context;
2920 struct funnel_queue_entry *entry;
2922 assert_in_hash_zone(zone, __func__);
2924 if (!list_empty(&zone->available)) {
2925 WRITE_ONCE(zone->active, zone->active + 1);
2926 context = list_first_entry(&zone->available, struct dedupe_context,
2928 list_del_init(&context->list_entry);
2932 entry = uds_funnel_queue_poll(zone->timed_out_complete);
2933 return ((entry == NULL) ?
2934 NULL : container_of(entry, struct dedupe_context, queue_entry));
2937 static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2938 enum uds_request_type operation)
2940 request->record_name = data_vio->record_name;
2941 request->type = operation;
2942 if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2944 struct uds_record_data *encoding = &request->new_metadata;
2946 encoding->data[offset++] = UDS_ADVICE_VERSION;
2947 encoding->data[offset++] = data_vio->new_mapped.state;
2948 put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2949 offset += sizeof(u64);
2950 BUG_ON(offset != UDS_ADVICE_SIZE);
2955 * The index operation will inquire about data_vio.record_name, providing (if the operation is
2956 * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2957 * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2958 * set to the return status code of any asynchronous index processing.
2960 static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2963 struct dedupe_context *context;
2964 struct vdo *vdo = vdo_from_data_vio(data_vio);
2965 struct hash_zone *zone = data_vio->hash_zone;
2967 assert_data_vio_in_hash_zone(data_vio);
2969 if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2970 continue_data_vio(data_vio);
2974 context = acquire_context(zone);
2975 if (context == NULL) {
2976 atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2977 continue_data_vio(data_vio);
2981 data_vio->dedupe_context = context;
2982 context->requestor = data_vio;
2983 context->submission_jiffies = jiffies;
2984 prepare_uds_request(&context->request, data_vio, operation);
2985 atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2986 list_add_tail(&context->list_entry, &zone->pending);
2987 start_expiration_timer(context);
2988 result = uds_launch_request(&context->request);
2989 if (result != UDS_SUCCESS) {
2990 context->request.status = result;
2991 finish_index_operation(&context->request);
2995 static void set_target_state(struct hash_zones *zones, enum index_state target,
2996 bool change_dedupe, bool dedupe, bool set_create)
2998 const char *old_state, *new_state;
3000 spin_lock(&zones->lock);
3001 old_state = index_state_to_string(zones, zones->index_target);
3003 WRITE_ONCE(zones->dedupe_flag, dedupe);
3006 zones->create_flag = true;
3008 zones->index_target = target;
3009 launch_dedupe_state_change(zones);
3010 new_state = index_state_to_string(zones, zones->index_target);
3011 spin_unlock(&zones->lock);
3013 if (old_state != new_state)
3014 uds_log_info("Setting UDS index target state to %s", new_state);
3017 const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
3021 spin_lock(&zones->lock);
3022 state = index_state_to_string(zones, zones->index_state);
3023 spin_unlock(&zones->lock);
3028 /* Handle a dmsetup message relevant to the index. */
3029 int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
3031 if (strcasecmp(name, "index-close") == 0) {
3032 set_target_state(zones, IS_CLOSED, false, false, false);
3034 } else if (strcasecmp(name, "index-create") == 0) {
3035 set_target_state(zones, IS_OPENED, false, false, true);
3037 } else if (strcasecmp(name, "index-disable") == 0) {
3038 set_target_state(zones, IS_OPENED, true, false, false);
3040 } else if (strcasecmp(name, "index-enable") == 0) {
3041 set_target_state(zones, IS_OPENED, true, true, false);
3048 int vdo_add_dedupe_index_sysfs(struct hash_zones *zones)
3050 int result = kobject_add(&zones->dedupe_directory,
3051 &zones->completion.vdo->vdo_directory, "dedupe");
3054 vdo_set_admin_state_code(&zones->state,
3055 VDO_ADMIN_STATE_NORMAL_OPERATION);
3060 /* If create_flag, create a new index without first attempting to load an existing index. */
3061 void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
3063 set_target_state(zones, IS_OPENED, true, true, create_flag);