dm vdo errors: remove unused error codes
[linux-block.git] / drivers / md / dm-vdo / funnel-workqueue.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "funnel-workqueue.h"
7
8 #include <linux/atomic.h>
9 #include <linux/cache.h>
10 #include <linux/completion.h>
11 #include <linux/err.h>
12 #include <linux/kthread.h>
13 #include <linux/percpu.h>
14
15 #include "funnel-queue.h"
16 #include "logger.h"
17 #include "memory-alloc.h"
18 #include "numeric.h"
19 #include "permassert.h"
20 #include "string-utils.h"
21
22 #include "completion.h"
23 #include "status-codes.h"
24
25 static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
26
27 /**
28  * DOC: Work queue definition.
29  *
30  * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31  * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32  * Externally, both are represented via the same common sub-structure, though there's actually not
33  * a great deal of overlap between the two types internally.
34  */
35 struct vdo_work_queue {
36         /* Name of just the work queue (e.g., "cpuQ12") */
37         char *name;
38         bool round_robin_mode;
39         struct vdo_thread *owner;
40         /* Life cycle functions, etc */
41         const struct vdo_work_queue_type *type;
42 };
43
44 struct simple_work_queue {
45         struct vdo_work_queue common;
46         struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
47         void *private;
48
49         /*
50          * The fields above are unchanged after setup but often read, and are good candidates for
51          * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
52          * The fields below are often modified as we sleep and wake, so we want a separate cache
53          * line for performance.
54          */
55
56         /* Any (0 or 1) worker threads waiting for new work to do */
57         wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
58         /* Hack to reduce wakeup calls if the worker thread is running */
59         atomic_t idle;
60
61         /* These are infrequently used so in terms of performance we don't care where they land. */
62         struct task_struct *thread;
63         /* Notify creator once worker has initialized */
64         struct completion *started;
65 };
66
67 struct round_robin_work_queue {
68         struct vdo_work_queue common;
69         struct simple_work_queue **service_queues;
70         unsigned int num_service_queues;
71 };
72
73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
74 {
75         return ((queue == NULL) ?
76                 NULL : container_of(queue, struct simple_work_queue, common));
77 }
78
79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
80 {
81         return ((queue == NULL) ?
82                  NULL :
83                  container_of(queue, struct round_robin_work_queue, common));
84 }
85
86 /* Processing normal completions. */
87
88 /*
89  * Dequeue and return the next waiting completion, if any.
90  *
91  * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
92  * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93  * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
94  * enforcement of priorities becomes necessary, this function will need fixing.
95  */
96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
97 {
98         int i;
99
100         for (i = queue->common.type->max_priority; i >= 0; i--) {
101                 struct funnel_queue_entry *link = uds_funnel_queue_poll(queue->priority_lists[i]);
102
103                 if (link != NULL)
104                         return container_of(link, struct vdo_completion, work_queue_entry_link);
105         }
106
107         return NULL;
108 }
109
110 static void enqueue_work_queue_completion(struct simple_work_queue *queue,
111                                           struct vdo_completion *completion)
112 {
113         ASSERT_LOG_ONLY(completion->my_queue == NULL,
114                         "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
115                         completion, completion->callback, queue, completion->my_queue);
116         if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
117                 completion->priority = queue->common.type->default_priority;
118
119         if (ASSERT(completion->priority <= queue->common.type->max_priority,
120                    "priority is in range for queue") != VDO_SUCCESS)
121                 completion->priority = 0;
122
123         completion->my_queue = &queue->common;
124
125         /* Funnel queue handles the synchronization for the put. */
126         uds_funnel_queue_put(queue->priority_lists[completion->priority],
127                              &completion->work_queue_entry_link);
128
129         /*
130          * Due to how funnel queue synchronization is handled (just atomic operations), the
131          * simplest safe implementation here would be to wake-up any waiting threads after
132          * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133          * item to the queue, the consumer thread may not see this since it is not guaranteed to
134          * have the same view of the queue as a producer thread.
135          *
136          * However, the above is wasteful so instead we attempt to minimize the number of thread
137          * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
138          * able to determine when the worker thread might be asleep or going to sleep. We use
139          * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
140          * waking the worker thread, so multiple wakeups aren't tried at once.
141          *
142          * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
143          * first is any better or worse for other platforms, even other x86 configurations.
144          */
145         smp_mb();
146         if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
147                 return;
148
149         /* There's a maximum of one thread in this list. */
150         wake_up(&queue->waiting_worker_threads);
151 }
152
153 static void run_start_hook(struct simple_work_queue *queue)
154 {
155         if (queue->common.type->start != NULL)
156                 queue->common.type->start(queue->private);
157 }
158
159 static void run_finish_hook(struct simple_work_queue *queue)
160 {
161         if (queue->common.type->finish != NULL)
162                 queue->common.type->finish(queue->private);
163 }
164
165 /*
166  * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
167  * for us to shut down.
168  *
169  * If kthread_should_stop says it's time to stop but we have pending completions return a
170  * completion.
171  *
172  * Also update statistics relating to scheduler interactions.
173  */
174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
175 {
176         struct vdo_completion *completion;
177         DEFINE_WAIT(wait);
178
179         while (true) {
180                 prepare_to_wait(&queue->waiting_worker_threads, &wait,
181                                 TASK_INTERRUPTIBLE);
182                 /*
183                  * Don't set the idle flag until a wakeup will not be lost.
184                  *
185                  * Force synchronization between setting the idle flag and checking the funnel
186                  * queue; the producer side will do them in the reverse order. (There's still a
187                  * race condition we've chosen to allow, because we've got a timeout below that
188                  * unwedges us if we hit it, but this may narrow the window a little.)
189                  */
190                 atomic_set(&queue->idle, 1);
191                 smp_mb(); /* store-load barrier between "idle" and funnel queue */
192
193                 completion = poll_for_completion(queue);
194                 if (completion != NULL)
195                         break;
196
197                 /*
198                  * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
199                  * above. Otherwise, schedule() will put the thread to sleep and might miss a
200                  * wakeup from kthread_stop() call in vdo_finish_work_queue().
201                  */
202                 if (kthread_should_stop())
203                         break;
204
205                 schedule();
206
207                 /*
208                  * Most of the time when we wake, it should be because there's work to do. If it
209                  * was a spurious wakeup, continue looping.
210                  */
211                 completion = poll_for_completion(queue);
212                 if (completion != NULL)
213                         break;
214         }
215
216         finish_wait(&queue->waiting_worker_threads, &wait);
217         atomic_set(&queue->idle, 0);
218
219         return completion;
220 }
221
222 static void process_completion(struct simple_work_queue *queue,
223                                struct vdo_completion *completion)
224 {
225         if (ASSERT(completion->my_queue == &queue->common,
226                    "completion %px from queue %px marked as being in this queue (%px)",
227                    completion, queue, completion->my_queue) == UDS_SUCCESS)
228                 completion->my_queue = NULL;
229
230         vdo_run_completion(completion);
231 }
232
233 static void service_work_queue(struct simple_work_queue *queue)
234 {
235         run_start_hook(queue);
236
237         while (true) {
238                 struct vdo_completion *completion = poll_for_completion(queue);
239
240                 if (completion == NULL)
241                         completion = wait_for_next_completion(queue);
242
243                 if (completion == NULL) {
244                         /* No completions but kthread_should_stop() was triggered. */
245                         break;
246                 }
247
248                 process_completion(queue, completion);
249
250                 /*
251                  * Be friendly to a CPU that has other work to do, if the kernel has told us to.
252                  * This speeds up some performance tests; that "other work" might include other VDO
253                  * threads.
254                  */
255                 if (need_resched())
256                         cond_resched();
257         }
258
259         run_finish_hook(queue);
260 }
261
262 static int work_queue_runner(void *ptr)
263 {
264         struct simple_work_queue *queue = ptr;
265
266         complete(queue->started);
267         service_work_queue(queue);
268         return 0;
269 }
270
271 /* Creation & teardown */
272
273 static void free_simple_work_queue(struct simple_work_queue *queue)
274 {
275         unsigned int i;
276
277         for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
278                 uds_free_funnel_queue(queue->priority_lists[i]);
279         vdo_free(queue->common.name);
280         vdo_free(queue);
281 }
282
283 static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
284 {
285         struct simple_work_queue **queue_table = queue->service_queues;
286         unsigned int count = queue->num_service_queues;
287         unsigned int i;
288
289         queue->service_queues = NULL;
290
291         for (i = 0; i < count; i++)
292                 free_simple_work_queue(queue_table[i]);
293         vdo_free(queue_table);
294         vdo_free(queue->common.name);
295         vdo_free(queue);
296 }
297
298 void vdo_free_work_queue(struct vdo_work_queue *queue)
299 {
300         if (queue == NULL)
301                 return;
302
303         vdo_finish_work_queue(queue);
304
305         if (queue->round_robin_mode)
306                 free_round_robin_work_queue(as_round_robin_work_queue(queue));
307         else
308                 free_simple_work_queue(as_simple_work_queue(queue));
309 }
310
311 static int make_simple_work_queue(const char *thread_name_prefix, const char *name,
312                                   struct vdo_thread *owner, void *private,
313                                   const struct vdo_work_queue_type *type,
314                                   struct simple_work_queue **queue_ptr)
315 {
316         DECLARE_COMPLETION_ONSTACK(started);
317         struct simple_work_queue *queue;
318         int i;
319         struct task_struct *thread = NULL;
320         int result;
321
322         ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
323                         "queue priority count %u within limit %u", type->max_priority,
324                         VDO_WORK_Q_MAX_PRIORITY);
325
326         result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue);
327         if (result != UDS_SUCCESS)
328                 return result;
329
330         queue->private = private;
331         queue->started = &started;
332         queue->common.type = type;
333         queue->common.owner = owner;
334         init_waitqueue_head(&queue->waiting_worker_threads);
335
336         result = vdo_duplicate_string(name, "queue name", &queue->common.name);
337         if (result != VDO_SUCCESS) {
338                 vdo_free(queue);
339                 return -ENOMEM;
340         }
341
342         for (i = 0; i <= type->max_priority; i++) {
343                 result = uds_make_funnel_queue(&queue->priority_lists[i]);
344                 if (result != UDS_SUCCESS) {
345                         free_simple_work_queue(queue);
346                         return result;
347                 }
348         }
349
350         thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
351                              queue->common.name);
352         if (IS_ERR(thread)) {
353                 free_simple_work_queue(queue);
354                 return (int) PTR_ERR(thread);
355         }
356
357         queue->thread = thread;
358
359         /*
360          * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
361          * errors elsewhere) could cause it to never get as far as running VDO, skipping the
362          * cleanup code.
363          *
364          * Eventually we should just make that path safe too, and then we won't need this
365          * synchronization.
366          */
367         wait_for_completion(&started);
368
369         *queue_ptr = queue;
370         return UDS_SUCCESS;
371 }
372
373 /**
374  * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
375  *                         be distributed to them in round-robin fashion.
376  *
377  * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
378  * of the actual number of queues and threads allocated here, code outside of the queue
379  * implementation will treat this as a single zone.
380  */
381 int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
382                         struct vdo_thread *owner, const struct vdo_work_queue_type *type,
383                         unsigned int thread_count, void *thread_privates[],
384                         struct vdo_work_queue **queue_ptr)
385 {
386         struct round_robin_work_queue *queue;
387         int result;
388         char thread_name[TASK_COMM_LEN];
389         unsigned int i;
390
391         if (thread_count == 1) {
392                 struct simple_work_queue *simple_queue;
393                 void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
394
395                 result = make_simple_work_queue(thread_name_prefix, name, owner, context,
396                                                 type, &simple_queue);
397                 if (result == VDO_SUCCESS)
398                         *queue_ptr = &simple_queue->common;
399                 return result;
400         }
401
402         result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue",
403                               &queue);
404         if (result != UDS_SUCCESS)
405                 return result;
406
407         result = vdo_allocate(thread_count, struct simple_work_queue *,
408                               "subordinate work queues", &queue->service_queues);
409         if (result != UDS_SUCCESS) {
410                 vdo_free(queue);
411                 return result;
412         }
413
414         queue->num_service_queues = thread_count;
415         queue->common.round_robin_mode = true;
416         queue->common.owner = owner;
417
418         result = vdo_duplicate_string(name, "queue name", &queue->common.name);
419         if (result != VDO_SUCCESS) {
420                 vdo_free(queue->service_queues);
421                 vdo_free(queue);
422                 return -ENOMEM;
423         }
424
425         *queue_ptr = &queue->common;
426
427         for (i = 0; i < thread_count; i++) {
428                 void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
429
430                 snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
431                 result = make_simple_work_queue(thread_name_prefix, thread_name, owner,
432                                                 context, type, &queue->service_queues[i]);
433                 if (result != VDO_SUCCESS) {
434                         queue->num_service_queues = i;
435                         /* Destroy previously created subordinates. */
436                         vdo_free_work_queue(vdo_forget(*queue_ptr));
437                         return result;
438                 }
439         }
440
441         return VDO_SUCCESS;
442 }
443
444 static void finish_simple_work_queue(struct simple_work_queue *queue)
445 {
446         if (queue->thread == NULL)
447                 return;
448
449         /* Tells the worker thread to shut down and waits for it to exit. */
450         kthread_stop(queue->thread);
451         queue->thread = NULL;
452 }
453
454 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
455 {
456         struct simple_work_queue **queue_table = queue->service_queues;
457         unsigned int count = queue->num_service_queues;
458         unsigned int i;
459
460         for (i = 0; i < count; i++)
461                 finish_simple_work_queue(queue_table[i]);
462 }
463
464 /* No enqueueing of completions should be done once this function is called. */
465 void vdo_finish_work_queue(struct vdo_work_queue *queue)
466 {
467         if (queue == NULL)
468                 return;
469
470         if (queue->round_robin_mode)
471                 finish_round_robin_work_queue(as_round_robin_work_queue(queue));
472         else
473                 finish_simple_work_queue(as_simple_work_queue(queue));
474 }
475
476 /* Debugging dumps */
477
478 static void dump_simple_work_queue(struct simple_work_queue *queue)
479 {
480         const char *thread_status = "no threads";
481         char task_state_report = '-';
482
483         if (queue->thread != NULL) {
484                 task_state_report = task_state_to_char(queue->thread);
485                 thread_status = atomic_read(&queue->idle) ? "idle" : "running";
486         }
487
488         uds_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
489                      thread_status, task_state_report);
490
491         /* ->waiting_worker_threads wait queue status? anyone waiting? */
492 }
493
494 /*
495  * Write to the buffer some info about the completion, for logging. Since the common use case is
496  * dumping info about a lot of completions to syslog all at once, the format favors brevity over
497  * readability.
498  */
499 void vdo_dump_work_queue(struct vdo_work_queue *queue)
500 {
501         if (queue->round_robin_mode) {
502                 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
503                 unsigned int i;
504
505                 for (i = 0; i < round_robin->num_service_queues; i++)
506                         dump_simple_work_queue(round_robin->service_queues[i]);
507         } else {
508                 dump_simple_work_queue(as_simple_work_queue(queue));
509         }
510 }
511
512 static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
513 {
514         if (pointer == NULL) {
515                 /*
516                  * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
517                  * sometimes use this when logging lots of data; don't be so verbose.
518                  */
519                 strscpy(buffer, "-", buffer_length);
520         } else {
521                 /*
522                  * Use a pragma to defeat gcc's format checking, which doesn't understand that
523                  * "%ps" actually does support a precision spec in Linux kernel code.
524                  */
525                 char *space;
526
527 #pragma GCC diagnostic push
528 #pragma GCC diagnostic ignored "-Wformat"
529                 snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
530 #pragma GCC diagnostic pop
531
532                 space = strchr(buffer, ' ');
533                 if (space != NULL)
534                         *space = '\0';
535         }
536 }
537
538 void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
539                                    size_t length)
540 {
541         size_t current_length =
542                 scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
543                           (completion->my_queue == NULL ? "-" : completion->my_queue->name));
544
545         if (current_length < length - 1) {
546                 get_function_name((void *) completion->callback, buffer + current_length,
547                                   length - current_length);
548         }
549 }
550
551 /* Completion submission */
552 /*
553  * If the completion has a timeout that has already passed, the timeout handler function may be
554  * invoked by this function.
555  */
556 void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
557                             struct vdo_completion *completion)
558 {
559         /*
560          * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
561          * on.
562          */
563         struct simple_work_queue *simple_queue = NULL;
564
565         if (!queue->round_robin_mode) {
566                 simple_queue = as_simple_work_queue(queue);
567         } else {
568                 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
569
570                 /*
571                  * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
572                  * Any patterns that might develop are likely to be disrupted by random ordering of
573                  * multiple completions and migration between cores, unless the load is so light as
574                  * to be regular in ordering of tasks and the threads are confined to individual
575                  * cores; with a load that light we won't care.
576                  */
577                 unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
578                 unsigned int index = rotor % round_robin->num_service_queues;
579
580                 simple_queue = round_robin->service_queues[index];
581         }
582
583         enqueue_work_queue_completion(simple_queue, completion);
584 }
585
586 /* Misc */
587
588 /*
589  * Return the work queue pointer recorded at initialization time in the work-queue stack handle
590  * initialized on the stack of the current thread, if any.
591  */
592 static struct simple_work_queue *get_current_thread_work_queue(void)
593 {
594         /*
595          * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
596          * the queue for the thread which was interrupted. However, the interrupted thread may have
597          * been processing a completion, in which case starting to process another would violate
598          * our concurrency assumptions.
599          */
600         if (in_interrupt())
601                 return NULL;
602
603         if (kthread_func(current) != work_queue_runner)
604                 /* Not a VDO work queue thread. */
605                 return NULL;
606
607         return kthread_data(current);
608 }
609
610 struct vdo_work_queue *vdo_get_current_work_queue(void)
611 {
612         struct simple_work_queue *queue = get_current_thread_work_queue();
613
614         return (queue == NULL) ? NULL : &queue->common;
615 }
616
617 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
618 {
619         return queue->owner;
620 }
621
622 /**
623  * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
624  *                                     queue, or NULL if none or if the current thread is not a
625  *                                     work queue thread.
626  */
627 void *vdo_get_work_queue_private_data(void)
628 {
629         struct simple_work_queue *queue = get_current_thread_work_queue();
630
631         return (queue != NULL) ? queue->private : NULL;
632 }
633
634 bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
635                             const struct vdo_work_queue_type *type)
636 {
637         return (queue->type == type);
638 }