ring-buffer: Fix a race between readers and resize checks
[linux-2.6-block.git] / kernel / trace / ring_buffer.c
CommitLineData
bcea3f96 1// SPDX-License-Identifier: GPL-2.0
7a8e76a3
SR
2/*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
28575c61 7#include <linux/trace_recursion.h>
af658dca 8#include <linux/trace_events.h>
7a8e76a3 9#include <linux/ring_buffer.h>
14131f2f 10#include <linux/trace_clock.h>
e6017571 11#include <linux/sched/clock.h>
117c3920 12#include <linux/cacheflush.h>
0b07436d 13#include <linux/trace_seq.h>
7a8e76a3 14#include <linux/spinlock.h>
15693458 15#include <linux/irq_work.h>
a356646a 16#include <linux/security.h>
7a8e76a3 17#include <linux/uaccess.h>
a81bd80a 18#include <linux/hardirq.h>
6c43e554 19#include <linux/kthread.h> /* for self test */
7a8e76a3
SR
20#include <linux/module.h>
21#include <linux/percpu.h>
22#include <linux/mutex.h>
6c43e554 23#include <linux/delay.h>
5a0e3ad6 24#include <linux/slab.h>
7a8e76a3
SR
25#include <linux/init.h>
26#include <linux/hash.h>
27#include <linux/list.h>
554f786e 28#include <linux/cpu.h>
927e56db 29#include <linux/oom.h>
117c3920 30#include <linux/mm.h>
7a8e76a3 31
c84897c0 32#include <asm/local64.h>
79615760 33#include <asm/local.h>
182e9f5f 34
6695da58
SRG
35/*
36 * The "absolute" timestamp in the buffer is only 59 bits.
37 * If a clock has the 5 MSBs set, it needs to be saved and
38 * reinserted.
39 */
40#define TS_MSB (0xf8ULL << 56)
41#define ABS_TS_MASK (~TS_MSB)
42
83f40318
VN
43static void update_pages_handler(struct work_struct *work);
44
d1b182a8
SR
45/*
46 * The ring buffer header is special. We must manually up keep it.
47 */
48int ring_buffer_print_entry_header(struct trace_seq *s)
49{
c0cd93aa
SRRH
50 trace_seq_puts(s, "# compressed entry header\n");
51 trace_seq_puts(s, "\ttype_len : 5 bits\n");
52 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
53 trace_seq_puts(s, "\tarray : 32 bits\n");
54 trace_seq_putc(s, '\n');
55 trace_seq_printf(s, "\tpadding : type == %d\n",
56 RINGBUF_TYPE_PADDING);
57 trace_seq_printf(s, "\ttime_extend : type == %d\n",
58 RINGBUF_TYPE_TIME_EXTEND);
dc4e2801
TZ
59 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
60 RINGBUF_TYPE_TIME_STAMP);
c0cd93aa
SRRH
61 trace_seq_printf(s, "\tdata max type_len == %d\n",
62 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
63
64 return !trace_seq_has_overflowed(s);
d1b182a8
SR
65}
66
5cc98548
SR
67/*
68 * The ring buffer is made up of a list of pages. A separate list of pages is
69 * allocated for each CPU. A writer may only write to a buffer that is
70 * associated with the CPU it is currently executing on. A reader may read
71 * from any per cpu buffer.
72 *
73 * The reader is special. For each per cpu buffer, the reader has its own
74 * reader page. When a reader has read the entire reader page, this reader
75 * page is swapped with another page in the ring buffer.
76 *
77 * Now, as long as the writer is off the reader page, the reader can do what
78 * ever it wants with that page. The writer will never write to that page
79 * again (as long as it is out of the ring buffer).
80 *
81 * Here's some silly ASCII art.
82 *
83 * +------+
84 * |reader| RING BUFFER
85 * |page |
86 * +------+ +---+ +---+ +---+
87 * | |-->| |-->| |
88 * +---+ +---+ +---+
89 * ^ |
90 * | |
91 * +---------------+
92 *
93 *
94 * +------+
95 * |reader| RING BUFFER
96 * |page |------------------v
97 * +------+ +---+ +---+ +---+
98 * | |-->| |-->| |
99 * +---+ +---+ +---+
100 * ^ |
101 * | |
102 * +---------------+
103 *
104 *
105 * +------+
106 * |reader| RING BUFFER
107 * |page |------------------v
108 * +------+ +---+ +---+ +---+
109 * ^ | |-->| |-->| |
110 * | +---+ +---+ +---+
111 * | |
112 * | |
113 * +------------------------------+
114 *
115 *
116 * +------+
117 * |buffer| RING BUFFER
118 * |page |------------------v
119 * +------+ +---+ +---+ +---+
120 * ^ | | | |-->| |
121 * | New +---+ +---+ +---+
122 * | Reader------^ |
123 * | page |
124 * +------------------------------+
125 *
126 *
127 * After we make this swap, the reader can hand this page off to the splice
128 * code and be done with it. It can even allocate a new page if it needs to
129 * and swap that into the ring buffer.
130 *
131 * We will be using cmpxchg soon to make all this lockless.
132 *
133 */
134
499e5470
SR
135/* Used for individual buffers (after the counter) */
136#define RB_BUFFER_OFF (1 << 20)
a3583244 137
499e5470 138#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
033601a3 139
e3d6bf0a 140#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
67d34724 141#define RB_ALIGNMENT 4U
334d4169 142#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
c7b09308 143#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
adab66b7
SRV
144
145#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
146# define RB_FORCE_8BYTE_ALIGNMENT 0
147# define RB_ARCH_ALIGNMENT RB_ALIGNMENT
148#else
149# define RB_FORCE_8BYTE_ALIGNMENT 1
150# define RB_ARCH_ALIGNMENT 8U
151#endif
152
153#define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
649508f6 154
334d4169
LJ
155/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
156#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
7a8e76a3
SR
157
158enum {
159 RB_LEN_TIME_EXTEND = 8,
dc4e2801 160 RB_LEN_TIME_STAMP = 8,
7a8e76a3
SR
161};
162
69d1b839
SR
163#define skip_time_extend(event) \
164 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
165
dc4e2801
TZ
166#define extended_time(event) \
167 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
168
bc92b956 169static inline bool rb_null_event(struct ring_buffer_event *event)
2d622719 170{
a1863c21 171 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
2d622719
TZ
172}
173
174static void rb_event_set_padding(struct ring_buffer_event *event)
175{
a1863c21 176 /* padding has a NULL time_delta */
334d4169 177 event->type_len = RINGBUF_TYPE_PADDING;
2d622719
TZ
178 event->time_delta = 0;
179}
180
34a148bf 181static unsigned
2d622719 182rb_event_data_length(struct ring_buffer_event *event)
7a8e76a3
SR
183{
184 unsigned length;
185
334d4169
LJ
186 if (event->type_len)
187 length = event->type_len * RB_ALIGNMENT;
2d622719
TZ
188 else
189 length = event->array[0];
190 return length + RB_EVNT_HDR_SIZE;
191}
192
69d1b839
SR
193/*
194 * Return the length of the given event. Will return
195 * the length of the time extend if the event is a
196 * time extend.
197 */
198static inline unsigned
2d622719
TZ
199rb_event_length(struct ring_buffer_event *event)
200{
334d4169 201 switch (event->type_len) {
7a8e76a3 202 case RINGBUF_TYPE_PADDING:
2d622719
TZ
203 if (rb_null_event(event))
204 /* undefined */
205 return -1;
334d4169 206 return event->array[0] + RB_EVNT_HDR_SIZE;
7a8e76a3
SR
207
208 case RINGBUF_TYPE_TIME_EXTEND:
209 return RB_LEN_TIME_EXTEND;
210
211 case RINGBUF_TYPE_TIME_STAMP:
212 return RB_LEN_TIME_STAMP;
213
214 case RINGBUF_TYPE_DATA:
2d622719 215 return rb_event_data_length(event);
7a8e76a3 216 default:
da4d401a 217 WARN_ON_ONCE(1);
7a8e76a3
SR
218 }
219 /* not hit */
220 return 0;
221}
222
69d1b839
SR
223/*
224 * Return total length of time extend and data,
225 * or just the event length for all other events.
226 */
227static inline unsigned
228rb_event_ts_length(struct ring_buffer_event *event)
229{
230 unsigned len = 0;
231
dc4e2801 232 if (extended_time(event)) {
69d1b839
SR
233 /* time extends include the data event after it */
234 len = RB_LEN_TIME_EXTEND;
235 event = skip_time_extend(event);
236 }
237 return len + rb_event_length(event);
238}
239
7a8e76a3
SR
240/**
241 * ring_buffer_event_length - return the length of the event
242 * @event: the event to get the length of
69d1b839
SR
243 *
244 * Returns the size of the data load of a data event.
245 * If the event is something other than a data event, it
246 * returns the size of the event itself. With the exception
247 * of a TIME EXTEND, where it still returns the size of the
248 * data load of the data event after it.
7a8e76a3
SR
249 */
250unsigned ring_buffer_event_length(struct ring_buffer_event *event)
251{
69d1b839
SR
252 unsigned length;
253
dc4e2801 254 if (extended_time(event))
69d1b839
SR
255 event = skip_time_extend(event);
256
257 length = rb_event_length(event);
334d4169 258 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
465634ad
RR
259 return length;
260 length -= RB_EVNT_HDR_SIZE;
261 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
262 length -= sizeof(event->array[0]);
263 return length;
7a8e76a3 264}
c4f50183 265EXPORT_SYMBOL_GPL(ring_buffer_event_length);
7a8e76a3
SR
266
267/* inline for ring buffer fast paths */
929ddbf3 268static __always_inline void *
7a8e76a3
SR
269rb_event_data(struct ring_buffer_event *event)
270{
dc4e2801 271 if (extended_time(event))
69d1b839 272 event = skip_time_extend(event);
da4d401a 273 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
7a8e76a3 274 /* If length is in len field, then array[0] has the data */
334d4169 275 if (event->type_len)
7a8e76a3
SR
276 return (void *)&event->array[0];
277 /* Otherwise length is in array[0] and array[1] has the data */
278 return (void *)&event->array[1];
279}
280
281/**
282 * ring_buffer_event_data - return the data of the event
283 * @event: the event to get the data from
284 */
285void *ring_buffer_event_data(struct ring_buffer_event *event)
286{
287 return rb_event_data(event);
288}
c4f50183 289EXPORT_SYMBOL_GPL(ring_buffer_event_data);
7a8e76a3
SR
290
291#define for_each_buffer_cpu(buffer, cpu) \
9e01c1b7 292 for_each_cpu(cpu, buffer->cpumask)
7a8e76a3 293
b23d7a5f
NP
294#define for_each_online_buffer_cpu(buffer, cpu) \
295 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
296
7a8e76a3
SR
297#define TS_SHIFT 27
298#define TS_MASK ((1ULL << TS_SHIFT) - 1)
299#define TS_DELTA_TEST (~TS_MASK)
300
e20044f7
SRV
301static u64 rb_event_time_stamp(struct ring_buffer_event *event)
302{
303 u64 ts;
304
305 ts = event->array[0];
306 ts <<= TS_SHIFT;
307 ts += event->time_delta;
308
309 return ts;
310}
311
66a8cb95
SR
312/* Flag when events were overwritten */
313#define RB_MISSED_EVENTS (1 << 31)
ff0ff84a
SR
314/* Missed count stored at end */
315#define RB_MISSED_STORED (1 << 30)
66a8cb95 316
fe832be0
SRG
317#define RB_MISSED_MASK (3 << 30)
318
abc9b56d 319struct buffer_data_page {
e4c2ce82 320 u64 time_stamp; /* page time stamp */
c3706f00 321 local_t commit; /* write committed index */
649508f6 322 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
abc9b56d
SR
323};
324
bce761d7
TSV
325struct buffer_data_read_page {
326 unsigned order; /* order of the page */
327 struct buffer_data_page *data; /* actual data, stored in this page */
328};
329
77ae365e
SR
330/*
331 * Note, the buffer_page list must be first. The buffer pages
332 * are allocated in cache lines, which means that each buffer
333 * page will be at the beginning of a cache line, and thus
334 * the least significant bits will be zero. We use this to
335 * add flags in the list struct pointers, to make the ring buffer
336 * lockless.
337 */
abc9b56d 338struct buffer_page {
778c55d4 339 struct list_head list; /* list of buffer pages */
abc9b56d 340 local_t write; /* index for next write */
6f807acd 341 unsigned read; /* index for next read */
778c55d4 342 local_t entries; /* entries on this page */
ff0ff84a 343 unsigned long real_end; /* real end of data */
f9b94daa 344 unsigned order; /* order of the page */
117c3920 345 u32 id; /* ID for external mapping */
abc9b56d 346 struct buffer_data_page *page; /* Actual data page */
7a8e76a3
SR
347};
348
77ae365e
SR
349/*
350 * The buffer page counters, write and entries, must be reset
351 * atomically when crossing page boundaries. To synchronize this
352 * update, two counters are inserted into the number. One is
353 * the actual counter for the write position or count on the page.
354 *
355 * The other is a counter of updaters. Before an update happens
356 * the update partition of the counter is incremented. This will
357 * allow the updater to update the counter atomically.
358 *
359 * The counter is 20 bits, and the state data is 12.
360 */
361#define RB_WRITE_MASK 0xfffff
362#define RB_WRITE_INTCNT (1 << 20)
363
044fa782 364static void rb_init_page(struct buffer_data_page *bpage)
abc9b56d 365{
044fa782 366 local_set(&bpage->commit, 0);
abc9b56d
SR
367}
368
45d99ea4
ZY
369static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
370{
371 return local_read(&bpage->page->commit);
372}
373
34a148bf 374static void free_buffer_page(struct buffer_page *bpage)
ed56829c 375{
f9b94daa 376 free_pages((unsigned long)bpage->page, bpage->order);
e4c2ce82 377 kfree(bpage);
ed56829c
SR
378}
379
7a8e76a3
SR
380/*
381 * We need to fit the time_stamp delta into 27 bits.
382 */
bc92b956 383static inline bool test_time_stamp(u64 delta)
7a8e76a3 384{
bc92b956 385 return !!(delta & TS_DELTA_TEST);
7a8e76a3
SR
386}
387
15693458
SRRH
388struct rb_irq_work {
389 struct irq_work work;
390 wait_queue_head_t waiters;
1e0d6714 391 wait_queue_head_t full_waiters;
b70f2938 392 atomic_t seq;
15693458 393 bool waiters_pending;
1e0d6714
SRRH
394 bool full_waiters_pending;
395 bool wakeup_full;
15693458
SRRH
396};
397
fcc742ea
SRRH
398/*
399 * Structure to hold event state and handle nested events.
400 */
401struct rb_event_info {
402 u64 ts;
403 u64 delta;
58fbc3c6
SRV
404 u64 before;
405 u64 after;
fcc742ea
SRRH
406 unsigned long length;
407 struct buffer_page *tail_page;
408 int add_timestamp;
409};
410
a389d86f
SRV
411/*
412 * Used for the add_timestamp
413 * NONE
7c4b4a51
SRV
414 * EXTEND - wants a time extend
415 * ABSOLUTE - the buffer requests all events to have absolute time stamps
a389d86f
SRV
416 * FORCE - force a full time stamp.
417 */
418enum {
7c4b4a51
SRV
419 RB_ADD_STAMP_NONE = 0,
420 RB_ADD_STAMP_EXTEND = BIT(1),
421 RB_ADD_STAMP_ABSOLUTE = BIT(2),
422 RB_ADD_STAMP_FORCE = BIT(3)
a389d86f 423};
a497adb4
SRRH
424/*
425 * Used for which event context the event is in.
b02414c8
SRV
426 * TRANSITION = 0
427 * NMI = 1
428 * IRQ = 2
429 * SOFTIRQ = 3
430 * NORMAL = 4
a497adb4
SRRH
431 *
432 * See trace_recursive_lock() comment below for more details.
433 */
434enum {
b02414c8 435 RB_CTX_TRANSITION,
a497adb4
SRRH
436 RB_CTX_NMI,
437 RB_CTX_IRQ,
438 RB_CTX_SOFTIRQ,
439 RB_CTX_NORMAL,
440 RB_CTX_MAX
441};
442
10464b4a
SRV
443struct rb_time_struct {
444 local64_t time;
445};
10464b4a
SRV
446typedef struct rb_time_struct rb_time_t;
447
8672e494
SRV
448#define MAX_NEST 5
449
7a8e76a3
SR
450/*
451 * head_page == tail_page && head == tail then buffer is empty.
452 */
453struct ring_buffer_per_cpu {
454 int cpu;
985023de 455 atomic_t record_disabled;
07b8b10e 456 atomic_t resize_disabled;
13292494 457 struct trace_buffer *buffer;
5389f6fa 458 raw_spinlock_t reader_lock; /* serialize readers */
445c8951 459 arch_spinlock_t lock;
7a8e76a3 460 struct lock_class_key lock_key;
73a757e6 461 struct buffer_data_page *free_page;
9b94a8fb 462 unsigned long nr_pages;
58a09ec6 463 unsigned int current_context;
3adc54fa 464 struct list_head *pages;
6f807acd
SR
465 struct buffer_page *head_page; /* read from head */
466 struct buffer_page *tail_page; /* write to tail */
c3706f00 467 struct buffer_page *commit_page; /* committed pages */
d769041f 468 struct buffer_page *reader_page;
66a8cb95
SR
469 unsigned long lost_events;
470 unsigned long last_overrun;
8e012066 471 unsigned long nest;
c64e148a 472 local_t entries_bytes;
e4906eff 473 local_t entries;
884bfe89
SP
474 local_t overrun;
475 local_t commit_overrun;
476 local_t dropped_events;
fa743953
SR
477 local_t committing;
478 local_t commits;
2c2b0a78 479 local_t pages_touched;
31029a8b 480 local_t pages_lost;
2c2b0a78 481 local_t pages_read;
03329f99 482 long last_pages_touch;
2c2b0a78 483 size_t shortest_full;
77ae365e 484 unsigned long read;
c64e148a 485 unsigned long read_bytes;
10464b4a
SRV
486 rb_time_t write_stamp;
487 rb_time_t before_stamp;
8672e494 488 u64 event_stamp[MAX_NEST];
7a8e76a3 489 u64 read_stamp;
2d093282
ZY
490 /* pages removed since last reset */
491 unsigned long pages_removed;
117c3920
VD
492
493 unsigned int mapped;
494 struct mutex mapping_lock;
495 unsigned long *subbuf_ids; /* ID to subbuf VA */
496 struct trace_buffer_meta *meta_page;
497
438ced17 498 /* ring buffer pages to update, > 0 to add, < 0 to remove */
9b94a8fb 499 long nr_pages_to_update;
438ced17 500 struct list_head new_pages; /* new pages to add */
83f40318 501 struct work_struct update_pages_work;
05fdd70d 502 struct completion update_done;
15693458
SRRH
503
504 struct rb_irq_work irq_work;
7a8e76a3
SR
505};
506
13292494 507struct trace_buffer {
7a8e76a3
SR
508 unsigned flags;
509 int cpus;
7a8e76a3 510 atomic_t record_disabled;
8a96c028 511 atomic_t resizing;
00f62f61 512 cpumask_var_t cpumask;
7a8e76a3 513
1f8a6a10
PZ
514 struct lock_class_key *reader_lock_key;
515
7a8e76a3
SR
516 struct mutex mutex;
517
518 struct ring_buffer_per_cpu **buffers;
554f786e 519
b32614c0 520 struct hlist_node node;
37886f6a 521 u64 (*clock)(void);
15693458
SRRH
522
523 struct rb_irq_work irq_work;
00b41452 524 bool time_stamp_abs;
139f8400
TSV
525
526 unsigned int subbuf_size;
2808e31e 527 unsigned int subbuf_order;
139f8400 528 unsigned int max_data_size;
7a8e76a3
SR
529};
530
531struct ring_buffer_iter {
532 struct ring_buffer_per_cpu *cpu_buffer;
533 unsigned long head;
785888c5 534 unsigned long next_event;
7a8e76a3 535 struct buffer_page *head_page;
492a74f4
SR
536 struct buffer_page *cache_reader_page;
537 unsigned long cache_read;
2d093282 538 unsigned long cache_pages_removed;
7a8e76a3 539 u64 read_stamp;
28e3fc56 540 u64 page_stamp;
785888c5 541 struct ring_buffer_event *event;
139f8400 542 size_t event_size;
c9b7a4a7 543 int missed_events;
7a8e76a3
SR
544};
545
139f8400 546int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s)
10464b4a 547{
d5cfbdfc 548 struct buffer_data_page field;
10464b4a 549
d5cfbdfc
TSV
550 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
551 "offset:0;\tsize:%u;\tsigned:%u;\n",
552 (unsigned int)sizeof(field.time_stamp),
553 (unsigned int)is_signed_type(u64));
10464b4a 554
d5cfbdfc
TSV
555 trace_seq_printf(s, "\tfield: local_t commit;\t"
556 "offset:%u;\tsize:%u;\tsigned:%u;\n",
557 (unsigned int)offsetof(typeof(field), commit),
558 (unsigned int)sizeof(field.commit),
559 (unsigned int)is_signed_type(long));
10464b4a 560
d5cfbdfc
TSV
561 trace_seq_printf(s, "\tfield: int overwrite;\t"
562 "offset:%u;\tsize:%u;\tsigned:%u;\n",
563 (unsigned int)offsetof(typeof(field), commit),
564 1,
565 (unsigned int)is_signed_type(long));
10464b4a 566
d5cfbdfc
TSV
567 trace_seq_printf(s, "\tfield: char data;\t"
568 "offset:%u;\tsize:%u;\tsigned:%u;\n",
569 (unsigned int)offsetof(typeof(field), data),
139f8400 570 (unsigned int)buffer->subbuf_size,
d5cfbdfc 571 (unsigned int)is_signed_type(char));
10464b4a 572
d5cfbdfc 573 return !trace_seq_has_overflowed(s);
10464b4a
SRV
574}
575
c84897c0 576static inline void rb_time_read(rb_time_t *t, u64 *ret)
10464b4a
SRV
577{
578 *ret = local64_read(&t->time);
10464b4a
SRV
579}
580static void rb_time_set(rb_time_t *t, u64 val)
581{
582 local64_set(&t->time, val);
583}
10464b4a 584
a948c69d
SRV
585/*
586 * Enable this to make sure that the event passed to
587 * ring_buffer_event_time_stamp() is not committed and also
588 * is on the buffer that it passed in.
589 */
590//#define RB_VERIFY_EVENT
591#ifdef RB_VERIFY_EVENT
592static struct list_head *rb_list_head(struct list_head *list);
593static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
594 void *event)
595{
596 struct buffer_page *page = cpu_buffer->commit_page;
597 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
598 struct list_head *next;
599 long commit, write;
600 unsigned long addr = (unsigned long)event;
601 bool done = false;
602 int stop = 0;
603
604 /* Make sure the event exists and is not committed yet */
605 do {
606 if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
607 done = true;
608 commit = local_read(&page->page->commit);
609 write = local_read(&page->write);
610 if (addr >= (unsigned long)&page->page->data[commit] &&
611 addr < (unsigned long)&page->page->data[write])
612 return;
613
614 next = rb_list_head(page->list.next);
615 page = list_entry(next, struct buffer_page, list);
616 } while (!done);
617 WARN_ON_ONCE(1);
618}
619#else
620static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
621 void *event)
622{
623}
624#endif
625
6695da58
SRG
626/*
627 * The absolute time stamp drops the 5 MSBs and some clocks may
628 * require them. The rb_fix_abs_ts() will take a previous full
629 * time stamp, and add the 5 MSB of that time stamp on to the
630 * saved absolute time stamp. Then they are compared in case of
631 * the unlikely event that the latest time stamp incremented
632 * the 5 MSB.
633 */
634static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
635{
636 if (save_ts & TS_MSB) {
637 abs |= save_ts & TS_MSB;
638 /* Check for overflow */
639 if (unlikely(abs < save_ts))
640 abs += 1ULL << 59;
641 }
642 return abs;
643}
a948c69d 644
efe6196a
SRV
645static inline u64 rb_time_stamp(struct trace_buffer *buffer);
646
647/**
648 * ring_buffer_event_time_stamp - return the event's current time stamp
649 * @buffer: The buffer that the event is on
650 * @event: the event to get the time stamp of
651 *
652 * Note, this must be called after @event is reserved, and before it is
653 * committed to the ring buffer. And must be called from the same
654 * context where the event was reserved (normal, softirq, irq, etc).
655 *
656 * Returns the time stamp associated with the current event.
657 * If the event has an extended time stamp, then that is used as
658 * the time stamp to return.
659 * In the highly unlikely case that the event was nested more than
660 * the max nesting, then the write_stamp of the buffer is returned,
661 * otherwise current time is returned, but that really neither of
662 * the last two cases should ever happen.
663 */
664u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
665 struct ring_buffer_event *event)
666{
667 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
668 unsigned int nest;
669 u64 ts;
670
671 /* If the event includes an absolute time, then just use that */
6695da58
SRG
672 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
673 ts = rb_event_time_stamp(event);
674 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
675 }
efe6196a 676
a948c69d
SRV
677 nest = local_read(&cpu_buffer->committing);
678 verify_event(cpu_buffer, event);
679 if (WARN_ON_ONCE(!nest))
680 goto fail;
681
efe6196a 682 /* Read the current saved nesting level time stamp */
a948c69d 683 if (likely(--nest < MAX_NEST))
efe6196a
SRV
684 return cpu_buffer->event_stamp[nest];
685
a948c69d
SRV
686 /* Shouldn't happen, warn if it does */
687 WARN_ONCE(1, "nest (%d) greater than max", nest);
efe6196a 688
a948c69d 689 fail:
c84897c0 690 rb_time_read(&cpu_buffer->write_stamp, &ts);
efe6196a
SRV
691
692 return ts;
693}
694
2c2b0a78
SRV
695/**
696 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
697 * @buffer: The ring_buffer to get the number of pages from
698 * @cpu: The cpu of the ring_buffer to get the number of pages from
699 *
700 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
701 */
13292494 702size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
2c2b0a78
SRV
703{
704 return buffer->buffers[cpu]->nr_pages;
705}
706
707/**
b7085b6f 708 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer
2c2b0a78
SRV
709 * @buffer: The ring_buffer to get the number of pages from
710 * @cpu: The cpu of the ring_buffer to get the number of pages from
711 *
712 * Returns the number of pages that have content in the ring buffer.
713 */
13292494 714size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
2c2b0a78
SRV
715{
716 size_t read;
31029a8b 717 size_t lost;
2c2b0a78
SRV
718 size_t cnt;
719
720 read = local_read(&buffer->buffers[cpu]->pages_read);
31029a8b 721 lost = local_read(&buffer->buffers[cpu]->pages_lost);
2c2b0a78 722 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
31029a8b
SRG
723
724 if (WARN_ON_ONCE(cnt < lost))
725 return 0;
726
727 cnt -= lost;
728
2c2b0a78
SRV
729 /* The reader can read an empty page, but not more than that */
730 if (cnt < read) {
731 WARN_ON_ONCE(read > cnt + 1);
732 return 0;
733 }
734
735 return cnt - read;
736}
737
42fb0a1e
SRG
738static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
739{
740 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
741 size_t nr_pages;
742 size_t dirty;
743
744 nr_pages = cpu_buffer->nr_pages;
745 if (!nr_pages || !full)
746 return true;
747
623b1f89
SRG
748 /*
749 * Add one as dirty will never equal nr_pages, as the sub-buffer
750 * that the writer is on is not counted as dirty.
751 * This is needed if "buffer_percent" is set to 100.
752 */
753 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
42fb0a1e 754
623b1f89 755 return (dirty * 100) >= (full * nr_pages);
42fb0a1e
SRG
756}
757
15693458
SRRH
758/*
759 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
760 *
761 * Schedules a delayed work to wake up any task that is blocked on the
762 * ring buffer waiters queue.
763 */
764static void rb_wake_up_waiters(struct irq_work *work)
765{
766 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
767
b70f2938
SRG
768 /* For waiters waiting for the first wake up */
769 (void)atomic_fetch_inc_release(&rbwork->seq);
770
15693458 771 wake_up_all(&rbwork->waiters);
ec0bbc5e 772 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
68282dd9
SRG
773 /* Only cpu_buffer sets the above flags */
774 struct ring_buffer_per_cpu *cpu_buffer =
775 container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
776
777 /* Called from interrupt context */
778 raw_spin_lock(&cpu_buffer->reader_lock);
1e0d6714 779 rbwork->wakeup_full = false;
ec0bbc5e 780 rbwork->full_waiters_pending = false;
68282dd9
SRG
781
782 /* Waking up all waiters, they will reset the shortest full */
783 cpu_buffer->shortest_full = 0;
784 raw_spin_unlock(&cpu_buffer->reader_lock);
785
1e0d6714
SRRH
786 wake_up_all(&rbwork->full_waiters);
787 }
15693458
SRRH
788}
789
7e9fbbb1
SRG
790/**
791 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
792 * @buffer: The ring buffer to wake waiters on
151e34d1 793 * @cpu: The CPU buffer to wake waiters on
7e9fbbb1
SRG
794 *
795 * In the case of a file that represents a ring buffer is closing,
796 * it is prudent to wake up any waiters that are on this.
797 */
798void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
799{
800 struct ring_buffer_per_cpu *cpu_buffer;
801 struct rb_irq_work *rbwork;
802
7433632c
SRG
803 if (!buffer)
804 return;
805
7e9fbbb1
SRG
806 if (cpu == RING_BUFFER_ALL_CPUS) {
807
808 /* Wake up individual ones too. One level recursion */
809 for_each_buffer_cpu(buffer, cpu)
810 ring_buffer_wake_waiters(buffer, cpu);
811
812 rbwork = &buffer->irq_work;
813 } else {
7433632c
SRG
814 if (WARN_ON_ONCE(!buffer->buffers))
815 return;
816 if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
817 return;
818
7e9fbbb1 819 cpu_buffer = buffer->buffers[cpu];
7433632c
SRG
820 /* The CPU buffer may not have been initialized yet */
821 if (!cpu_buffer)
822 return;
7e9fbbb1
SRG
823 rbwork = &cpu_buffer->irq_work;
824 }
825
39a7dc23
SRG
826 /* This can be called in any context */
827 irq_work_queue(&rbwork->work);
7e9fbbb1
SRG
828}
829
b3594573
SRG
830static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
831{
832 struct ring_buffer_per_cpu *cpu_buffer;
833 bool ret = false;
834
835 /* Reads of all CPUs always waits for any data */
836 if (cpu == RING_BUFFER_ALL_CPUS)
837 return !ring_buffer_empty(buffer);
838
839 cpu_buffer = buffer->buffers[cpu];
840
841 if (!ring_buffer_empty_cpu(buffer, cpu)) {
842 unsigned long flags;
843 bool pagebusy;
844
845 if (!full)
846 return true;
847
848 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
849 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
850 ret = !pagebusy && full_hit(buffer, cpu, full);
851
761d9473
SRG
852 if (!ret && (!cpu_buffer->shortest_full ||
853 cpu_buffer->shortest_full > full)) {
854 cpu_buffer->shortest_full = full;
855 }
b3594573
SRG
856 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
857 }
858 return ret;
859}
860
7af9ded0
SRG
861static inline bool
862rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
863 int cpu, int full, ring_buffer_cond_fn cond, void *data)
15693458 864{
7af9ded0
SRG
865 if (rb_watermark_hit(buffer, cpu, full))
866 return true;
15693458 867
7af9ded0
SRG
868 if (cond(data))
869 return true;
b3594573
SRG
870
871 /*
872 * The events can happen in critical sections where
873 * checking a work queue can cause deadlocks.
874 * After adding a task to the queue, this flag is set
875 * only to notify events to try to wake up the queue
876 * using irq_work.
877 *
878 * We don't clear it even if the buffer is no longer
879 * empty. The flag only causes the next event to run
880 * irq_work to do the work queue wake up. The worse
881 * that can happen if we race with !trace_empty() is that
882 * an event will cause an irq_work to try to wake up
883 * an empty queue.
884 *
885 * There's no reason to protect this flag either, as
886 * the work queue and irq_work logic will do the necessary
887 * synchronization for the wake ups. The only thing
888 * that is necessary is that the wake up happens after
889 * a task has been queued. It's OK for spurious wake ups.
890 */
891 if (full)
7af9ded0 892 rbwork->full_waiters_pending = true;
b3594573 893 else
7af9ded0 894 rbwork->waiters_pending = true;
b3594573 895
7af9ded0
SRG
896 return false;
897}
b3594573 898
b70f2938
SRG
899struct rb_wait_data {
900 struct rb_irq_work *irq_work;
901 int seq;
902};
903
7af9ded0
SRG
904/*
905 * The default wait condition for ring_buffer_wait() is to just to exit the
906 * wait loop the first time it is woken up.
907 */
908static bool rb_wait_once(void *data)
909{
b70f2938
SRG
910 struct rb_wait_data *rdata = data;
911 struct rb_irq_work *rbwork = rdata->irq_work;
7af9ded0 912
b70f2938 913 return atomic_read_acquire(&rbwork->seq) != rdata->seq;
7af9ded0 914}
b3594573 915
7af9ded0
SRG
916/**
917 * ring_buffer_wait - wait for input to the ring buffer
918 * @buffer: buffer to wait on
919 * @cpu: the cpu buffer to wait on
920 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
2aa043a5
SRG
921 * @cond: condition function to break out of wait (NULL to run once)
922 * @data: the data to pass to @cond.
7af9ded0
SRG
923 *
924 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
925 * as data is added to any of the @buffer's cpu buffers. Otherwise
926 * it will wait for data to be added to a specific cpu buffer.
927 */
2aa043a5
SRG
928int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
929 ring_buffer_cond_fn cond, void *data)
7af9ded0
SRG
930{
931 struct ring_buffer_per_cpu *cpu_buffer;
932 struct wait_queue_head *waitq;
7af9ded0 933 struct rb_irq_work *rbwork;
b70f2938 934 struct rb_wait_data rdata;
7af9ded0
SRG
935 int ret = 0;
936
7af9ded0
SRG
937 /*
938 * Depending on what the caller is waiting for, either any
939 * data in any cpu buffer, or a specific buffer, put the
940 * caller on the appropriate wait queue.
941 */
942 if (cpu == RING_BUFFER_ALL_CPUS) {
943 rbwork = &buffer->irq_work;
944 /* Full only makes sense on per cpu reads */
945 full = 0;
946 } else {
947 if (!cpumask_test_cpu(cpu, buffer->cpumask))
948 return -ENODEV;
949 cpu_buffer = buffer->buffers[cpu];
950 rbwork = &cpu_buffer->irq_work;
e30f53aa 951 }
15693458 952
1e0d6714 953 if (full)
7af9ded0 954 waitq = &rbwork->full_waiters;
1e0d6714 955 else
7af9ded0 956 waitq = &rbwork->waiters;
e30f53aa 957
b70f2938
SRG
958 /* Set up to exit loop as soon as it is woken */
959 if (!cond) {
960 cond = rb_wait_once;
961 rdata.irq_work = rbwork;
962 rdata.seq = atomic_read_acquire(&rbwork->seq);
963 data = &rdata;
964 }
965
7af9ded0
SRG
966 ret = wait_event_interruptible((*waitq),
967 rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
b3594573 968
e30f53aa 969 return ret;
15693458
SRRH
970}
971
972/**
973 * ring_buffer_poll_wait - poll on buffer input
974 * @buffer: buffer to wait on
975 * @cpu: the cpu buffer to wait on
976 * @filp: the file descriptor
977 * @poll_table: The poll descriptor
42fb0a1e 978 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
15693458
SRRH
979 *
980 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
981 * as data is added to any of the @buffer's cpu buffers. Otherwise
982 * it will wait for data to be added to a specific cpu buffer.
983 *
a9a08845 984 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
15693458
SRRH
985 * zero otherwise.
986 */
13292494 987__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
42fb0a1e 988 struct file *filp, poll_table *poll_table, int full)
15693458
SRRH
989{
990 struct ring_buffer_per_cpu *cpu_buffer;
68282dd9 991 struct rb_irq_work *rbwork;
15693458 992
42fb0a1e 993 if (cpu == RING_BUFFER_ALL_CPUS) {
68282dd9 994 rbwork = &buffer->irq_work;
42fb0a1e
SRG
995 full = 0;
996 } else {
6721cb60 997 if (!cpumask_test_cpu(cpu, buffer->cpumask))
66bbea9e 998 return EPOLLERR;
6721cb60 999
15693458 1000 cpu_buffer = buffer->buffers[cpu];
68282dd9 1001 rbwork = &cpu_buffer->irq_work;
15693458
SRRH
1002 }
1003
42fb0a1e 1004 if (full) {
68282dd9
SRG
1005 poll_wait(filp, &rbwork->full_waiters, poll_table);
1006
e36f19a6 1007 if (rb_watermark_hit(buffer, cpu, full))
8145f1c3
SRG
1008 return EPOLLIN | EPOLLRDNORM;
1009 /*
1010 * Only allow full_waiters_pending update to be seen after
e36f19a6
SRG
1011 * the shortest_full is set (in rb_watermark_hit). If the
1012 * writer sees the full_waiters_pending flag set, it will
1013 * compare the amount in the ring buffer to shortest_full.
1014 * If the amount in the ring buffer is greater than the
1015 * shortest_full percent, it will call the irq_work handler
1016 * to wake up this list. The irq_handler will reset shortest_full
8145f1c3
SRG
1017 * back to zero. That's done under the reader_lock, but
1018 * the below smp_mb() makes sure that the update to
1019 * full_waiters_pending doesn't leak up into the above.
1020 */
1021 smp_mb();
68282dd9 1022 rbwork->full_waiters_pending = true;
8145f1c3 1023 return 0;
42fb0a1e
SRG
1024 }
1025
8145f1c3
SRG
1026 poll_wait(filp, &rbwork->waiters, poll_table);
1027 rbwork->waiters_pending = true;
1028
4ce97dbf
JB
1029 /*
1030 * There's a tight race between setting the waiters_pending and
1031 * checking if the ring buffer is empty. Once the waiters_pending bit
1032 * is set, the next event will wake the task up, but we can get stuck
1033 * if there's only a single event in.
1034 *
1035 * FIXME: Ideally, we need a memory barrier on the writer side as well,
1036 * but adding a memory barrier to all events will cause too much of a
1037 * performance hit in the fast path. We only need a memory barrier when
1038 * the buffer goes from empty to having content. But as this race is
1039 * extremely small, and it's not a problem if another event comes in, we
1040 * will fix it later.
1041 */
1042 smp_mb();
15693458
SRRH
1043
1044 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1045 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
a9a08845 1046 return EPOLLIN | EPOLLRDNORM;
15693458
SRRH
1047 return 0;
1048}
1049
f536aafc 1050/* buffer may be either ring_buffer or ring_buffer_per_cpu */
077c5407
SR
1051#define RB_WARN_ON(b, cond) \
1052 ({ \
1053 int _____ret = unlikely(cond); \
1054 if (_____ret) { \
1055 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1056 struct ring_buffer_per_cpu *__b = \
1057 (void *)b; \
1058 atomic_inc(&__b->buffer->record_disabled); \
1059 } else \
1060 atomic_inc(&b->record_disabled); \
1061 WARN_ON(1); \
1062 } \
1063 _____ret; \
3e89c7bb 1064 })
f536aafc 1065
37886f6a
SR
1066/* Up this if you want to test the TIME_EXTENTS and normalization */
1067#define DEBUG_SHIFT 0
1068
13292494 1069static inline u64 rb_time_stamp(struct trace_buffer *buffer)
88eb0125 1070{
bbeba3e5
SRV
1071 u64 ts;
1072
1073 /* Skip retpolines :-( */
aefb2f2e 1074 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local))
bbeba3e5
SRV
1075 ts = trace_clock_local();
1076 else
1077 ts = buffer->clock();
1078
88eb0125 1079 /* shift to debug/test normalization and TIME_EXTENTS */
bbeba3e5 1080 return ts << DEBUG_SHIFT;
88eb0125
SR
1081}
1082
f3ef7202 1083u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
37886f6a
SR
1084{
1085 u64 time;
1086
1087 preempt_disable_notrace();
6d3f1e12 1088 time = rb_time_stamp(buffer);
d6097c9e 1089 preempt_enable_notrace();
37886f6a
SR
1090
1091 return time;
1092}
1093EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1094
13292494 1095void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
37886f6a
SR
1096 int cpu, u64 *ts)
1097{
1098 /* Just stupid testing the normalize function and deltas */
1099 *ts >>= DEBUG_SHIFT;
1100}
1101EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1102
77ae365e
SR
1103/*
1104 * Making the ring buffer lockless makes things tricky.
1105 * Although writes only happen on the CPU that they are on,
1106 * and they only need to worry about interrupts. Reads can
1107 * happen on any CPU.
1108 *
1109 * The reader page is always off the ring buffer, but when the
1110 * reader finishes with a page, it needs to swap its page with
1111 * a new one from the buffer. The reader needs to take from
1112 * the head (writes go to the tail). But if a writer is in overwrite
1113 * mode and wraps, it must push the head page forward.
1114 *
1115 * Here lies the problem.
1116 *
1117 * The reader must be careful to replace only the head page, and
1118 * not another one. As described at the top of the file in the
1119 * ASCII art, the reader sets its old page to point to the next
1120 * page after head. It then sets the page after head to point to
1121 * the old reader page. But if the writer moves the head page
1122 * during this operation, the reader could end up with the tail.
1123 *
1124 * We use cmpxchg to help prevent this race. We also do something
1125 * special with the page before head. We set the LSB to 1.
1126 *
1127 * When the writer must push the page forward, it will clear the
1128 * bit that points to the head page, move the head, and then set
1129 * the bit that points to the new head page.
1130 *
1131 * We also don't want an interrupt coming in and moving the head
1132 * page on another writer. Thus we use the second LSB to catch
1133 * that too. Thus:
1134 *
1135 * head->list->prev->next bit 1 bit 0
1136 * ------- -------
1137 * Normal page 0 0
1138 * Points to head page 0 1
1139 * New head page 1 0
1140 *
1141 * Note we can not trust the prev pointer of the head page, because:
1142 *
1143 * +----+ +-----+ +-----+
1144 * | |------>| T |---X--->| N |
1145 * | |<------| | | |
1146 * +----+ +-----+ +-----+
1147 * ^ ^ |
1148 * | +-----+ | |
1149 * +----------| R |----------+ |
1150 * | |<-----------+
1151 * +-----+
1152 *
1153 * Key: ---X--> HEAD flag set in pointer
1154 * T Tail page
1155 * R Reader page
1156 * N Next page
1157 *
1158 * (see __rb_reserve_next() to see where this happens)
1159 *
1160 * What the above shows is that the reader just swapped out
1161 * the reader page with a page in the buffer, but before it
1162 * could make the new header point back to the new page added
1163 * it was preempted by a writer. The writer moved forward onto
1164 * the new page added by the reader and is about to move forward
1165 * again.
1166 *
1167 * You can see, it is legitimate for the previous pointer of
1168 * the head (or any page) not to point back to itself. But only
6167c205 1169 * temporarily.
77ae365e
SR
1170 */
1171
1172#define RB_PAGE_NORMAL 0UL
1173#define RB_PAGE_HEAD 1UL
1174#define RB_PAGE_UPDATE 2UL
1175
1176
1177#define RB_FLAG_MASK 3UL
1178
1179/* PAGE_MOVED is not part of the mask */
1180#define RB_PAGE_MOVED 4UL
1181
1182/*
1183 * rb_list_head - remove any bit
1184 */
1185static struct list_head *rb_list_head(struct list_head *list)
1186{
1187 unsigned long val = (unsigned long)list;
1188
1189 return (struct list_head *)(val & ~RB_FLAG_MASK);
1190}
1191
1192/*
6d3f1e12 1193 * rb_is_head_page - test if the given page is the head page
77ae365e
SR
1194 *
1195 * Because the reader may move the head_page pointer, we can
1196 * not trust what the head page is (it may be pointing to
1197 * the reader page). But if the next page is a header page,
1198 * its flags will be non zero.
1199 */
42b16b3f 1200static inline int
6689bed3 1201rb_is_head_page(struct buffer_page *page, struct list_head *list)
77ae365e
SR
1202{
1203 unsigned long val;
1204
1205 val = (unsigned long)list->next;
1206
1207 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1208 return RB_PAGE_MOVED;
1209
1210 return val & RB_FLAG_MASK;
1211}
1212
1213/*
1214 * rb_is_reader_page
1215 *
1216 * The unique thing about the reader page, is that, if the
1217 * writer is ever on it, the previous pointer never points
1218 * back to the reader page.
1219 */
06ca3209 1220static bool rb_is_reader_page(struct buffer_page *page)
77ae365e
SR
1221{
1222 struct list_head *list = page->list.prev;
1223
1224 return rb_list_head(list->next) != &page->list;
1225}
1226
1227/*
1228 * rb_set_list_to_head - set a list_head to be pointing to head.
1229 */
6689bed3 1230static void rb_set_list_to_head(struct list_head *list)
77ae365e
SR
1231{
1232 unsigned long *ptr;
1233
1234 ptr = (unsigned long *)&list->next;
1235 *ptr |= RB_PAGE_HEAD;
1236 *ptr &= ~RB_PAGE_UPDATE;
1237}
1238
1239/*
1240 * rb_head_page_activate - sets up head page
1241 */
1242static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1243{
1244 struct buffer_page *head;
1245
1246 head = cpu_buffer->head_page;
1247 if (!head)
1248 return;
1249
1250 /*
1251 * Set the previous list pointer to have the HEAD flag.
1252 */
6689bed3 1253 rb_set_list_to_head(head->list.prev);
77ae365e
SR
1254}
1255
1256static void rb_list_head_clear(struct list_head *list)
1257{
1258 unsigned long *ptr = (unsigned long *)&list->next;
1259
1260 *ptr &= ~RB_FLAG_MASK;
1261}
1262
1263/*
6167c205 1264 * rb_head_page_deactivate - clears head page ptr (for free list)
77ae365e
SR
1265 */
1266static void
1267rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1268{
1269 struct list_head *hd;
1270
1271 /* Go through the whole list and clear any pointers found. */
1272 rb_list_head_clear(cpu_buffer->pages);
1273
1274 list_for_each(hd, cpu_buffer->pages)
1275 rb_list_head_clear(hd);
1276}
1277
1278static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1279 struct buffer_page *head,
1280 struct buffer_page *prev,
1281 int old_flag, int new_flag)
1282{
1283 struct list_head *list;
1284 unsigned long val = (unsigned long)&head->list;
1285 unsigned long ret;
1286
1287 list = &prev->list;
1288
1289 val &= ~RB_FLAG_MASK;
1290
08a40816
SR
1291 ret = cmpxchg((unsigned long *)&list->next,
1292 val | old_flag, val | new_flag);
77ae365e
SR
1293
1294 /* check if the reader took the page */
1295 if ((ret & ~RB_FLAG_MASK) != val)
1296 return RB_PAGE_MOVED;
1297
1298 return ret & RB_FLAG_MASK;
1299}
1300
1301static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1302 struct buffer_page *head,
1303 struct buffer_page *prev,
1304 int old_flag)
1305{
1306 return rb_head_page_set(cpu_buffer, head, prev,
1307 old_flag, RB_PAGE_UPDATE);
1308}
1309
1310static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1311 struct buffer_page *head,
1312 struct buffer_page *prev,
1313 int old_flag)
1314{
1315 return rb_head_page_set(cpu_buffer, head, prev,
1316 old_flag, RB_PAGE_HEAD);
1317}
1318
1319static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1320 struct buffer_page *head,
1321 struct buffer_page *prev,
1322 int old_flag)
1323{
1324 return rb_head_page_set(cpu_buffer, head, prev,
1325 old_flag, RB_PAGE_NORMAL);
1326}
1327
6689bed3 1328static inline void rb_inc_page(struct buffer_page **bpage)
77ae365e
SR
1329{
1330 struct list_head *p = rb_list_head((*bpage)->list.next);
1331
1332 *bpage = list_entry(p, struct buffer_page, list);
1333}
1334
1335static struct buffer_page *
1336rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1337{
1338 struct buffer_page *head;
1339 struct buffer_page *page;
1340 struct list_head *list;
1341 int i;
1342
1343 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1344 return NULL;
1345
1346 /* sanity check */
1347 list = cpu_buffer->pages;
1348 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1349 return NULL;
1350
1351 page = head = cpu_buffer->head_page;
1352 /*
1353 * It is possible that the writer moves the header behind
1354 * where we started, and we miss in one loop.
1355 * A second loop should grab the header, but we'll do
1356 * three loops just because I'm paranoid.
1357 */
1358 for (i = 0; i < 3; i++) {
1359 do {
6689bed3 1360 if (rb_is_head_page(page, page->list.prev)) {
77ae365e
SR
1361 cpu_buffer->head_page = page;
1362 return page;
1363 }
6689bed3 1364 rb_inc_page(&page);
77ae365e
SR
1365 } while (page != head);
1366 }
1367
1368 RB_WARN_ON(cpu_buffer, 1);
1369
1370 return NULL;
1371}
1372
bc92b956 1373static bool rb_head_page_replace(struct buffer_page *old,
77ae365e
SR
1374 struct buffer_page *new)
1375{
1376 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1377 unsigned long val;
77ae365e
SR
1378
1379 val = *ptr & ~RB_FLAG_MASK;
1380 val |= RB_PAGE_HEAD;
1381
00a8478f 1382 return try_cmpxchg(ptr, &val, (unsigned long)&new->list);
77ae365e
SR
1383}
1384
1385/*
1386 * rb_tail_page_update - move the tail page forward
77ae365e 1387 */
70004986 1388static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
77ae365e
SR
1389 struct buffer_page *tail_page,
1390 struct buffer_page *next_page)
1391{
77ae365e
SR
1392 unsigned long old_entries;
1393 unsigned long old_write;
77ae365e
SR
1394
1395 /*
1396 * The tail page now needs to be moved forward.
1397 *
1398 * We need to reset the tail page, but without messing
1399 * with possible erasing of data brought in by interrupts
1400 * that have moved the tail page and are currently on it.
1401 *
1402 * We add a counter to the write field to denote this.
1403 */
1404 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1405 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1406
1407 /*
1408 * Just make sure we have seen our old_write and synchronize
1409 * with any interrupts that come in.
1410 */
1411 barrier();
1412
1413 /*
1414 * If the tail page is still the same as what we think
1415 * it is, then it is up to us to update the tail
1416 * pointer.
1417 */
8573636e 1418 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
77ae365e
SR
1419 /* Zero the write counter */
1420 unsigned long val = old_write & ~RB_WRITE_MASK;
1421 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1422
1423 /*
1424 * This will only succeed if an interrupt did
1425 * not come in and change it. In which case, we
1426 * do not want to modify it.
da706d8b
LJ
1427 *
1428 * We add (void) to let the compiler know that we do not care
1429 * about the return value of these functions. We use the
1430 * cmpxchg to only update if an interrupt did not already
1431 * do it for us. If the cmpxchg fails, we don't care.
77ae365e 1432 */
da706d8b
LJ
1433 (void)local_cmpxchg(&next_page->write, old_write, val);
1434 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
77ae365e
SR
1435
1436 /*
1437 * No need to worry about races with clearing out the commit.
1438 * it only can increment when a commit takes place. But that
1439 * only happens in the outer most nested commit.
1440 */
1441 local_set(&next_page->page->commit, 0);
1442
ffe3986f
SRG
1443 /* Either we update tail_page or an interrupt does */
1444 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
1445 local_inc(&cpu_buffer->pages_touched);
77ae365e 1446 }
77ae365e
SR
1447}
1448
b4b55dfd 1449static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
77ae365e
SR
1450 struct buffer_page *bpage)
1451{
1452 unsigned long val = (unsigned long)bpage;
1453
b4b55dfd 1454 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
77ae365e
SR
1455}
1456
7a8e76a3 1457/**
d611851b 1458 * rb_check_pages - integrity check of buffer pages
7a8e76a3
SR
1459 * @cpu_buffer: CPU buffer with pages to test
1460 *
c3706f00 1461 * As a safety measure we check to make sure the data pages have not
7a8e76a3 1462 * been corrupted.
c2274b90
PP
1463 *
1464 * Callers of this function need to guarantee that the list of pages doesn't get
1465 * modified during the check. In particular, if it's possible that the function
1466 * is invoked with concurrent readers which can swap in a new reader page then
1467 * the caller should take cpu_buffer->reader_lock.
7a8e76a3 1468 */
b4b55dfd 1469static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 1470{
8843e06f
MO
1471 struct list_head *head = rb_list_head(cpu_buffer->pages);
1472 struct list_head *tmp;
308f7eeb 1473
8843e06f
MO
1474 if (RB_WARN_ON(cpu_buffer,
1475 rb_list_head(rb_list_head(head->next)->prev) != head))
b4b55dfd 1476 return;
7a8e76a3 1477
8843e06f
MO
1478 if (RB_WARN_ON(cpu_buffer,
1479 rb_list_head(rb_list_head(head->prev)->next) != head))
b4b55dfd 1480 return;
77ae365e 1481
8843e06f 1482 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
3e89c7bb 1483 if (RB_WARN_ON(cpu_buffer,
8843e06f 1484 rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
b4b55dfd 1485 return;
8843e06f 1486
3e89c7bb 1487 if (RB_WARN_ON(cpu_buffer,
8843e06f 1488 rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
b4b55dfd 1489 return;
7a8e76a3 1490 }
7a8e76a3
SR
1491}
1492
74e2afc6
QH
1493static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1494 long nr_pages, struct list_head *pages)
7a8e76a3 1495{
044fa782 1496 struct buffer_page *bpage, *tmp;
927e56db
SRV
1497 bool user_thread = current->mm != NULL;
1498 gfp_t mflags;
9b94a8fb 1499 long i;
3adc54fa 1500
927e56db
SRV
1501 /*
1502 * Check if the available memory is there first.
1503 * Note, si_mem_available() only gives us a rough estimate of available
1504 * memory. It may not be accurate. But we don't care, we just want
1505 * to prevent doing any allocation when it is obvious that it is
1506 * not going to succeed.
1507 */
2a872fa4
SRV
1508 i = si_mem_available();
1509 if (i < nr_pages)
1510 return -ENOMEM;
1511
927e56db
SRV
1512 /*
1513 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1514 * gracefully without invoking oom-killer and the system is not
1515 * destabilized.
1516 */
1517 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1518
1519 /*
1520 * If a user thread allocates too much, and si_mem_available()
1521 * reports there's enough memory, even though there is not.
1522 * Make sure the OOM killer kills this thread. This can happen
1523 * even with RETRY_MAYFAIL because another task may be doing
1524 * an allocation after this task has taken all memory.
1525 * This is the task the OOM killer needs to take out during this
1526 * loop, even if it was triggered by an allocation somewhere else.
1527 */
1528 if (user_thread)
1529 set_current_oom_origin();
7a8e76a3 1530 for (i = 0; i < nr_pages; i++) {
7ea59064 1531 struct page *page;
927e56db 1532
044fa782 1533 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
74e2afc6 1534 mflags, cpu_to_node(cpu_buffer->cpu));
044fa782 1535 if (!bpage)
e4c2ce82 1536 goto free_pages;
77ae365e 1537
74e2afc6
QH
1538 rb_check_bpage(cpu_buffer, bpage);
1539
438ced17 1540 list_add(&bpage->list, pages);
77ae365e 1541
6b76323e 1542 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
c09d4167 1543 mflags | __GFP_COMP | __GFP_ZERO,
f9b94daa 1544 cpu_buffer->buffer->subbuf_order);
7ea59064 1545 if (!page)
7a8e76a3 1546 goto free_pages;
7ea59064 1547 bpage->page = page_address(page);
f9b94daa 1548 bpage->order = cpu_buffer->buffer->subbuf_order;
044fa782 1549 rb_init_page(bpage->page);
927e56db
SRV
1550
1551 if (user_thread && fatal_signal_pending(current))
1552 goto free_pages;
7a8e76a3 1553 }
927e56db
SRV
1554 if (user_thread)
1555 clear_current_oom_origin();
7a8e76a3 1556
438ced17
VN
1557 return 0;
1558
1559free_pages:
1560 list_for_each_entry_safe(bpage, tmp, pages, list) {
1561 list_del_init(&bpage->list);
1562 free_buffer_page(bpage);
1563 }
927e56db
SRV
1564 if (user_thread)
1565 clear_current_oom_origin();
438ced17
VN
1566
1567 return -ENOMEM;
1568}
1569
1570static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
9b94a8fb 1571 unsigned long nr_pages)
438ced17
VN
1572{
1573 LIST_HEAD(pages);
1574
1575 WARN_ON(!nr_pages);
1576
74e2afc6 1577 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
438ced17
VN
1578 return -ENOMEM;
1579
3adc54fa
SR
1580 /*
1581 * The ring buffer page list is a circular list that does not
1582 * start and end with a list head. All page list items point to
1583 * other pages.
1584 */
1585 cpu_buffer->pages = pages.next;
1586 list_del(&pages);
7a8e76a3 1587
438ced17
VN
1588 cpu_buffer->nr_pages = nr_pages;
1589
7a8e76a3
SR
1590 rb_check_pages(cpu_buffer);
1591
1592 return 0;
7a8e76a3
SR
1593}
1594
1595static struct ring_buffer_per_cpu *
13292494 1596rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
7a8e76a3
SR
1597{
1598 struct ring_buffer_per_cpu *cpu_buffer;
044fa782 1599 struct buffer_page *bpage;
7ea59064 1600 struct page *page;
7a8e76a3
SR
1601 int ret;
1602
1603 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1604 GFP_KERNEL, cpu_to_node(cpu));
1605 if (!cpu_buffer)
1606 return NULL;
1607
1608 cpu_buffer->cpu = cpu;
1609 cpu_buffer->buffer = buffer;
5389f6fa 1610 raw_spin_lock_init(&cpu_buffer->reader_lock);
1f8a6a10 1611 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
edc35bd7 1612 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
83f40318 1613 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
05fdd70d 1614 init_completion(&cpu_buffer->update_done);
15693458 1615 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
f1dc6725 1616 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1e0d6714 1617 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
117c3920 1618 mutex_init(&cpu_buffer->mapping_lock);
7a8e76a3 1619
044fa782 1620 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
e4c2ce82 1621 GFP_KERNEL, cpu_to_node(cpu));
044fa782 1622 if (!bpage)
e4c2ce82
SR
1623 goto fail_free_buffer;
1624
77ae365e
SR
1625 rb_check_bpage(cpu_buffer, bpage);
1626
044fa782 1627 cpu_buffer->reader_page = bpage;
f9b94daa 1628
c09d4167 1629 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
6b76323e 1630 cpu_buffer->buffer->subbuf_order);
7ea59064 1631 if (!page)
e4c2ce82 1632 goto fail_free_reader;
7ea59064 1633 bpage->page = page_address(page);
044fa782 1634 rb_init_page(bpage->page);
e4c2ce82 1635
d769041f 1636 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
44b99462 1637 INIT_LIST_HEAD(&cpu_buffer->new_pages);
d769041f 1638
438ced17 1639 ret = rb_allocate_pages(cpu_buffer, nr_pages);
7a8e76a3 1640 if (ret < 0)
d769041f 1641 goto fail_free_reader;
7a8e76a3
SR
1642
1643 cpu_buffer->head_page
3adc54fa 1644 = list_entry(cpu_buffer->pages, struct buffer_page, list);
bf41a158 1645 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
7a8e76a3 1646
77ae365e
SR
1647 rb_head_page_activate(cpu_buffer);
1648
7a8e76a3
SR
1649 return cpu_buffer;
1650
d769041f
SR
1651 fail_free_reader:
1652 free_buffer_page(cpu_buffer->reader_page);
1653
7a8e76a3
SR
1654 fail_free_buffer:
1655 kfree(cpu_buffer);
1656 return NULL;
1657}
1658
1659static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1660{
3adc54fa 1661 struct list_head *head = cpu_buffer->pages;
044fa782 1662 struct buffer_page *bpage, *tmp;
7a8e76a3 1663
675751bb
JB
1664 irq_work_sync(&cpu_buffer->irq_work.work);
1665
d769041f
SR
1666 free_buffer_page(cpu_buffer->reader_page);
1667
3adc54fa 1668 if (head) {
56f4ca0a
DT
1669 rb_head_page_deactivate(cpu_buffer);
1670
3adc54fa
SR
1671 list_for_each_entry_safe(bpage, tmp, head, list) {
1672 list_del_init(&bpage->list);
1673 free_buffer_page(bpage);
1674 }
1675 bpage = list_entry(head, struct buffer_page, list);
044fa782 1676 free_buffer_page(bpage);
7a8e76a3 1677 }
3adc54fa 1678
17d80175
SRG
1679 free_page((unsigned long)cpu_buffer->free_page);
1680
7a8e76a3
SR
1681 kfree(cpu_buffer);
1682}
1683
1684/**
d611851b 1685 * __ring_buffer_alloc - allocate a new ring_buffer
68814b58 1686 * @size: the size in bytes per cpu that is needed.
7a8e76a3 1687 * @flags: attributes to set for the ring buffer.
59e7cffe 1688 * @key: ring buffer reader_lock_key.
7a8e76a3
SR
1689 *
1690 * Currently the only flag that is available is the RB_FL_OVERWRITE
1691 * flag. This flag means that the buffer will overwrite old data
1692 * when the buffer wraps. If this flag is not set, the buffer will
1693 * drop data when the tail hits the head.
1694 */
13292494 1695struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1f8a6a10 1696 struct lock_class_key *key)
7a8e76a3 1697{
13292494 1698 struct trace_buffer *buffer;
9b94a8fb 1699 long nr_pages;
7a8e76a3 1700 int bsize;
9b94a8fb 1701 int cpu;
b32614c0 1702 int ret;
7a8e76a3
SR
1703
1704 /* keep it in its own cache line */
1705 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1706 GFP_KERNEL);
1707 if (!buffer)
1708 return NULL;
1709
b18cc3de 1710 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
9e01c1b7
RR
1711 goto fail_free_buffer;
1712
139f8400 1713 /* Default buffer page size - one system page */
f9b94daa 1714 buffer->subbuf_order = 0;
139f8400
TSV
1715 buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
1716
1717 /* Max payload is buffer page size - header (8bytes) */
1718 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
1719
1720 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
7a8e76a3 1721 buffer->flags = flags;
37886f6a 1722 buffer->clock = trace_clock_local;
1f8a6a10 1723 buffer->reader_lock_key = key;
7a8e76a3 1724
15693458 1725 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
f1dc6725 1726 init_waitqueue_head(&buffer->irq_work.waiters);
15693458 1727
7a8e76a3 1728 /* need at least two pages */
438ced17
VN
1729 if (nr_pages < 2)
1730 nr_pages = 2;
7a8e76a3 1731
7a8e76a3
SR
1732 buffer->cpus = nr_cpu_ids;
1733
1734 bsize = sizeof(void *) * nr_cpu_ids;
1735 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1736 GFP_KERNEL);
1737 if (!buffer->buffers)
9e01c1b7 1738 goto fail_free_cpumask;
7a8e76a3 1739
b32614c0
SAS
1740 cpu = raw_smp_processor_id();
1741 cpumask_set_cpu(cpu, buffer->cpumask);
1742 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1743 if (!buffer->buffers[cpu])
1744 goto fail_free_buffers;
7a8e76a3 1745
b32614c0
SAS
1746 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1747 if (ret < 0)
1748 goto fail_free_buffers;
554f786e 1749
7a8e76a3
SR
1750 mutex_init(&buffer->mutex);
1751
1752 return buffer;
1753
1754 fail_free_buffers:
1755 for_each_buffer_cpu(buffer, cpu) {
1756 if (buffer->buffers[cpu])
1757 rb_free_cpu_buffer(buffer->buffers[cpu]);
1758 }
1759 kfree(buffer->buffers);
1760
9e01c1b7
RR
1761 fail_free_cpumask:
1762 free_cpumask_var(buffer->cpumask);
1763
7a8e76a3
SR
1764 fail_free_buffer:
1765 kfree(buffer);
1766 return NULL;
1767}
1f8a6a10 1768EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
7a8e76a3
SR
1769
1770/**
1771 * ring_buffer_free - free a ring buffer.
1772 * @buffer: the buffer to free.
1773 */
1774void
13292494 1775ring_buffer_free(struct trace_buffer *buffer)
7a8e76a3
SR
1776{
1777 int cpu;
1778
b32614c0 1779 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
554f786e 1780
675751bb
JB
1781 irq_work_sync(&buffer->irq_work.work);
1782
7a8e76a3
SR
1783 for_each_buffer_cpu(buffer, cpu)
1784 rb_free_cpu_buffer(buffer->buffers[cpu]);
1785
bd3f0221 1786 kfree(buffer->buffers);
9e01c1b7
RR
1787 free_cpumask_var(buffer->cpumask);
1788
7a8e76a3
SR
1789 kfree(buffer);
1790}
c4f50183 1791EXPORT_SYMBOL_GPL(ring_buffer_free);
7a8e76a3 1792
13292494 1793void ring_buffer_set_clock(struct trace_buffer *buffer,
37886f6a
SR
1794 u64 (*clock)(void))
1795{
1796 buffer->clock = clock;
1797}
1798
13292494 1799void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
00b41452
TZ
1800{
1801 buffer->time_stamp_abs = abs;
1802}
1803
13292494 1804bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
00b41452
TZ
1805{
1806 return buffer->time_stamp_abs;
1807}
1808
83f40318
VN
1809static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1810{
1811 return local_read(&bpage->entries) & RB_WRITE_MASK;
1812}
1813
1814static inline unsigned long rb_page_write(struct buffer_page *bpage)
1815{
1816 return local_read(&bpage->write) & RB_WRITE_MASK;
1817}
1818
bc92b956 1819static bool
9b94a8fb 1820rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
7a8e76a3 1821{
83f40318
VN
1822 struct list_head *tail_page, *to_remove, *next_page;
1823 struct buffer_page *to_remove_page, *tmp_iter_page;
1824 struct buffer_page *last_page, *first_page;
9b94a8fb 1825 unsigned long nr_removed;
83f40318
VN
1826 unsigned long head_bit;
1827 int page_entries;
1828
1829 head_bit = 0;
7a8e76a3 1830
5389f6fa 1831 raw_spin_lock_irq(&cpu_buffer->reader_lock);
83f40318
VN
1832 atomic_inc(&cpu_buffer->record_disabled);
1833 /*
1834 * We don't race with the readers since we have acquired the reader
1835 * lock. We also don't race with writers after disabling recording.
1836 * This makes it easy to figure out the first and the last page to be
1837 * removed from the list. We unlink all the pages in between including
1838 * the first and last pages. This is done in a busy loop so that we
1839 * lose the least number of traces.
1840 * The pages are freed after we restart recording and unlock readers.
1841 */
1842 tail_page = &cpu_buffer->tail_page->list;
77ae365e 1843
83f40318
VN
1844 /*
1845 * tail page might be on reader page, we remove the next page
1846 * from the ring buffer
1847 */
1848 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1849 tail_page = rb_list_head(tail_page->next);
1850 to_remove = tail_page;
1851
1852 /* start of pages to remove */
1853 first_page = list_entry(rb_list_head(to_remove->next),
1854 struct buffer_page, list);
1855
1856 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1857 to_remove = rb_list_head(to_remove)->next;
1858 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
7a8e76a3 1859 }
2d093282
ZY
1860 /* Read iterators need to reset themselves when some pages removed */
1861 cpu_buffer->pages_removed += nr_removed;
7a8e76a3 1862
83f40318 1863 next_page = rb_list_head(to_remove)->next;
7a8e76a3 1864
83f40318
VN
1865 /*
1866 * Now we remove all pages between tail_page and next_page.
1867 * Make sure that we have head_bit value preserved for the
1868 * next page
1869 */
1870 tail_page->next = (struct list_head *)((unsigned long)next_page |
1871 head_bit);
1872 next_page = rb_list_head(next_page);
1873 next_page->prev = tail_page;
1874
1875 /* make sure pages points to a valid page in the ring buffer */
1876 cpu_buffer->pages = next_page;
1877
1878 /* update head page */
1879 if (head_bit)
1880 cpu_buffer->head_page = list_entry(next_page,
1881 struct buffer_page, list);
1882
83f40318
VN
1883 /* pages are removed, resume tracing and then free the pages */
1884 atomic_dec(&cpu_buffer->record_disabled);
5389f6fa 1885 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
83f40318
VN
1886
1887 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1888
1889 /* last buffer page to remove */
1890 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1891 list);
1892 tmp_iter_page = first_page;
1893
1894 do {
83f36555
VN
1895 cond_resched();
1896
83f40318 1897 to_remove_page = tmp_iter_page;
6689bed3 1898 rb_inc_page(&tmp_iter_page);
83f40318
VN
1899
1900 /* update the counters */
1901 page_entries = rb_page_entries(to_remove_page);
1902 if (page_entries) {
1903 /*
1904 * If something was added to this page, it was full
1905 * since it is not the tail page. So we deduct the
1906 * bytes consumed in ring buffer from here.
48fdc72f 1907 * Increment overrun to account for the lost events.
83f40318 1908 */
48fdc72f 1909 local_add(page_entries, &cpu_buffer->overrun);
45d99ea4 1910 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
31029a8b 1911 local_inc(&cpu_buffer->pages_lost);
83f40318
VN
1912 }
1913
1914 /*
1915 * We have already removed references to this list item, just
1916 * free up the buffer_page and its page
1917 */
1918 free_buffer_page(to_remove_page);
1919 nr_removed--;
1920
1921 } while (to_remove_page != last_page);
1922
1923 RB_WARN_ON(cpu_buffer, nr_removed);
5040b4b7
VN
1924
1925 return nr_removed == 0;
7a8e76a3
SR
1926}
1927
bc92b956 1928static bool
5040b4b7 1929rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 1930{
5040b4b7 1931 struct list_head *pages = &cpu_buffer->new_pages;
88ca6a71 1932 unsigned long flags;
bc92b956
UB
1933 bool success;
1934 int retries;
7a8e76a3 1935
88ca6a71
SR
1936 /* Can be called at early boot up, where interrupts must not been enabled */
1937 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5040b4b7
VN
1938 /*
1939 * We are holding the reader lock, so the reader page won't be swapped
1940 * in the ring buffer. Now we are racing with the writer trying to
1941 * move head page and the tail page.
1942 * We are going to adapt the reader page update process where:
1943 * 1. We first splice the start and end of list of new pages between
1944 * the head page and its previous page.
1945 * 2. We cmpxchg the prev_page->next to point from head page to the
1946 * start of new pages list.
1947 * 3. Finally, we update the head->prev to the end of new list.
1948 *
1949 * We will try this process 10 times, to make sure that we don't keep
1950 * spinning.
1951 */
1952 retries = 10;
bc92b956 1953 success = false;
5040b4b7 1954 while (retries--) {
bdf4fb62 1955 struct list_head *head_page, *prev_page;
5040b4b7
VN
1956 struct list_head *last_page, *first_page;
1957 struct list_head *head_page_with_bit;
625ed527 1958 struct buffer_page *hpage = rb_set_head_page(cpu_buffer);
77ae365e 1959
625ed527 1960 if (!hpage)
54f7be5b 1961 break;
625ed527 1962 head_page = &hpage->list;
5040b4b7
VN
1963 prev_page = head_page->prev;
1964
1965 first_page = pages->next;
1966 last_page = pages->prev;
1967
1968 head_page_with_bit = (struct list_head *)
1969 ((unsigned long)head_page | RB_PAGE_HEAD);
1970
1971 last_page->next = head_page_with_bit;
1972 first_page->prev = prev_page;
1973
bdf4fb62
UB
1974 /* caution: head_page_with_bit gets updated on cmpxchg failure */
1975 if (try_cmpxchg(&prev_page->next,
1976 &head_page_with_bit, first_page)) {
5040b4b7
VN
1977 /*
1978 * yay, we replaced the page pointer to our new list,
1979 * now, we just have to update to head page's prev
1980 * pointer to point to end of list
1981 */
1982 head_page->prev = last_page;
bc92b956 1983 success = true;
5040b4b7
VN
1984 break;
1985 }
7a8e76a3 1986 }
7a8e76a3 1987
5040b4b7
VN
1988 if (success)
1989 INIT_LIST_HEAD(pages);
1990 /*
1991 * If we weren't successful in adding in new pages, warn and stop
1992 * tracing
1993 */
1994 RB_WARN_ON(cpu_buffer, !success);
88ca6a71 1995 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5040b4b7
VN
1996
1997 /* free pages if they weren't inserted */
1998 if (!success) {
1999 struct buffer_page *bpage, *tmp;
2000 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2001 list) {
2002 list_del_init(&bpage->list);
2003 free_buffer_page(bpage);
2004 }
2005 }
2006 return success;
7a8e76a3
SR
2007}
2008
83f40318 2009static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
438ced17 2010{
bc92b956 2011 bool success;
5040b4b7 2012
438ced17 2013 if (cpu_buffer->nr_pages_to_update > 0)
5040b4b7 2014 success = rb_insert_pages(cpu_buffer);
438ced17 2015 else
5040b4b7
VN
2016 success = rb_remove_pages(cpu_buffer,
2017 -cpu_buffer->nr_pages_to_update);
83f40318 2018
5040b4b7
VN
2019 if (success)
2020 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
83f40318
VN
2021}
2022
2023static void update_pages_handler(struct work_struct *work)
2024{
2025 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2026 struct ring_buffer_per_cpu, update_pages_work);
2027 rb_update_pages(cpu_buffer);
05fdd70d 2028 complete(&cpu_buffer->update_done);
438ced17
VN
2029}
2030
7a8e76a3
SR
2031/**
2032 * ring_buffer_resize - resize the ring buffer
2033 * @buffer: the buffer to resize.
2034 * @size: the new size.
d611851b 2035 * @cpu_id: the cpu buffer to resize
7a8e76a3 2036 *
139f8400 2037 * Minimum size is 2 * buffer->subbuf_size.
7a8e76a3 2038 *
83f40318 2039 * Returns 0 on success and < 0 on failure.
7a8e76a3 2040 */
13292494 2041int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
438ced17 2042 int cpu_id)
7a8e76a3
SR
2043{
2044 struct ring_buffer_per_cpu *cpu_buffer;
9b94a8fb 2045 unsigned long nr_pages;
0a1754b2 2046 int cpu, err;
7a8e76a3 2047
ee51a1de
IM
2048 /*
2049 * Always succeed at resizing a non-existent buffer:
2050 */
2051 if (!buffer)
0a1754b2 2052 return 0;
ee51a1de 2053
6a31e1f1
SR
2054 /* Make sure the requested buffer exists */
2055 if (cpu_id != RING_BUFFER_ALL_CPUS &&
2056 !cpumask_test_cpu(cpu_id, buffer->cpumask))
0a1754b2 2057 return 0;
6a31e1f1 2058
139f8400 2059 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
7a8e76a3
SR
2060
2061 /* we need a minimum of two pages */
59643d15
SRRH
2062 if (nr_pages < 2)
2063 nr_pages = 2;
7a8e76a3 2064
83f40318 2065 /* prevent another thread from changing buffer sizes */
7a8e76a3 2066 mutex_lock(&buffer->mutex);
8a96c028 2067 atomic_inc(&buffer->resizing);
07b8b10e 2068
438ced17 2069 if (cpu_id == RING_BUFFER_ALL_CPUS) {
07b8b10e
SRV
2070 /*
2071 * Don't succeed if resizing is disabled, as a reader might be
2072 * manipulating the ring buffer and is expecting a sane state while
2073 * this is true.
2074 */
2075 for_each_buffer_cpu(buffer, cpu) {
2076 cpu_buffer = buffer->buffers[cpu];
2077 if (atomic_read(&cpu_buffer->resize_disabled)) {
2078 err = -EBUSY;
2079 goto out_err_unlock;
2080 }
2081 }
2082
438ced17 2083 /* calculate the pages to update */
7a8e76a3
SR
2084 for_each_buffer_cpu(buffer, cpu) {
2085 cpu_buffer = buffer->buffers[cpu];
7a8e76a3 2086
438ced17
VN
2087 cpu_buffer->nr_pages_to_update = nr_pages -
2088 cpu_buffer->nr_pages;
438ced17
VN
2089 /*
2090 * nothing more to do for removing pages or no update
2091 */
2092 if (cpu_buffer->nr_pages_to_update <= 0)
2093 continue;
d7ec4bfe 2094 /*
438ced17
VN
2095 * to add pages, make sure all new pages can be
2096 * allocated without receiving ENOMEM
d7ec4bfe 2097 */
438ced17 2098 INIT_LIST_HEAD(&cpu_buffer->new_pages);
74e2afc6
QH
2099 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2100 &cpu_buffer->new_pages)) {
438ced17 2101 /* not enough memory for new pages */
83f40318
VN
2102 err = -ENOMEM;
2103 goto out_err;
2104 }
f6bd2c92
ZY
2105
2106 cond_resched();
83f40318
VN
2107 }
2108
99c37d1a 2109 cpus_read_lock();
83f40318
VN
2110 /*
2111 * Fire off all the required work handlers
05fdd70d 2112 * We can't schedule on offline CPUs, but it's not necessary
83f40318
VN
2113 * since we can change their buffer sizes without any race.
2114 */
2115 for_each_buffer_cpu(buffer, cpu) {
2116 cpu_buffer = buffer->buffers[cpu];
05fdd70d 2117 if (!cpu_buffer->nr_pages_to_update)
83f40318
VN
2118 continue;
2119
021c5b34
CM
2120 /* Can't run something on an offline CPU. */
2121 if (!cpu_online(cpu)) {
f5eb5588
SRRH
2122 rb_update_pages(cpu_buffer);
2123 cpu_buffer->nr_pages_to_update = 0;
2124 } else {
88ca6a71
SR
2125 /* Run directly if possible. */
2126 migrate_disable();
2127 if (cpu != smp_processor_id()) {
2128 migrate_enable();
2129 schedule_work_on(cpu,
2130 &cpu_buffer->update_pages_work);
2131 } else {
2132 update_pages_handler(&cpu_buffer->update_pages_work);
2133 migrate_enable();
2134 }
f5eb5588 2135 }
7a8e76a3 2136 }
7a8e76a3 2137
438ced17
VN
2138 /* wait for all the updates to complete */
2139 for_each_buffer_cpu(buffer, cpu) {
2140 cpu_buffer = buffer->buffers[cpu];
05fdd70d 2141 if (!cpu_buffer->nr_pages_to_update)
83f40318
VN
2142 continue;
2143
05fdd70d
VN
2144 if (cpu_online(cpu))
2145 wait_for_completion(&cpu_buffer->update_done);
83f40318 2146 cpu_buffer->nr_pages_to_update = 0;
438ced17 2147 }
83f40318 2148
99c37d1a 2149 cpus_read_unlock();
438ced17
VN
2150 } else {
2151 cpu_buffer = buffer->buffers[cpu_id];
83f40318 2152
438ced17
VN
2153 if (nr_pages == cpu_buffer->nr_pages)
2154 goto out;
7a8e76a3 2155
07b8b10e
SRV
2156 /*
2157 * Don't succeed if resizing is disabled, as a reader might be
2158 * manipulating the ring buffer and is expecting a sane state while
2159 * this is true.
2160 */
2161 if (atomic_read(&cpu_buffer->resize_disabled)) {
2162 err = -EBUSY;
2163 goto out_err_unlock;
2164 }
2165
438ced17
VN
2166 cpu_buffer->nr_pages_to_update = nr_pages -
2167 cpu_buffer->nr_pages;
2168
2169 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2170 if (cpu_buffer->nr_pages_to_update > 0 &&
74e2afc6
QH
2171 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2172 &cpu_buffer->new_pages)) {
83f40318
VN
2173 err = -ENOMEM;
2174 goto out_err;
2175 }
438ced17 2176
99c37d1a 2177 cpus_read_lock();
83f40318 2178
021c5b34
CM
2179 /* Can't run something on an offline CPU. */
2180 if (!cpu_online(cpu_id))
f5eb5588
SRRH
2181 rb_update_pages(cpu_buffer);
2182 else {
88ca6a71
SR
2183 /* Run directly if possible. */
2184 migrate_disable();
2185 if (cpu_id == smp_processor_id()) {
2186 rb_update_pages(cpu_buffer);
2187 migrate_enable();
2188 } else {
2189 migrate_enable();
2190 schedule_work_on(cpu_id,
2191 &cpu_buffer->update_pages_work);
2192 wait_for_completion(&cpu_buffer->update_done);
2193 }
f5eb5588 2194 }
83f40318 2195
83f40318 2196 cpu_buffer->nr_pages_to_update = 0;
99c37d1a 2197 cpus_read_unlock();
438ced17 2198 }
7a8e76a3
SR
2199
2200 out:
659f451f
SR
2201 /*
2202 * The ring buffer resize can happen with the ring buffer
2203 * enabled, so that the update disturbs the tracing as little
2204 * as possible. But if the buffer is disabled, we do not need
2205 * to worry about that, and we can take the time to verify
2206 * that the buffer is not corrupt.
2207 */
2208 if (atomic_read(&buffer->record_disabled)) {
2209 atomic_inc(&buffer->record_disabled);
2210 /*
2211 * Even though the buffer was disabled, we must make sure
2212 * that it is truly disabled before calling rb_check_pages.
2213 * There could have been a race between checking
2214 * record_disable and incrementing it.
2215 */
74401729 2216 synchronize_rcu();
659f451f 2217 for_each_buffer_cpu(buffer, cpu) {
c2274b90
PP
2218 unsigned long flags;
2219
659f451f 2220 cpu_buffer = buffer->buffers[cpu];
c2274b90 2221 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
659f451f 2222 rb_check_pages(cpu_buffer);
c2274b90 2223 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
659f451f
SR
2224 }
2225 atomic_dec(&buffer->record_disabled);
2226 }
2227
8a96c028 2228 atomic_dec(&buffer->resizing);
7a8e76a3 2229 mutex_unlock(&buffer->mutex);
0a1754b2 2230 return 0;
7a8e76a3 2231
83f40318 2232 out_err:
438ced17
VN
2233 for_each_buffer_cpu(buffer, cpu) {
2234 struct buffer_page *bpage, *tmp;
83f40318 2235
438ced17 2236 cpu_buffer = buffer->buffers[cpu];
438ced17 2237 cpu_buffer->nr_pages_to_update = 0;
83f40318 2238
438ced17
VN
2239 if (list_empty(&cpu_buffer->new_pages))
2240 continue;
83f40318 2241
438ced17
VN
2242 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2243 list) {
2244 list_del_init(&bpage->list);
2245 free_buffer_page(bpage);
2246 }
7a8e76a3 2247 }
07b8b10e 2248 out_err_unlock:
8a96c028 2249 atomic_dec(&buffer->resizing);
641d2f63 2250 mutex_unlock(&buffer->mutex);
83f40318 2251 return err;
7a8e76a3 2252}
c4f50183 2253EXPORT_SYMBOL_GPL(ring_buffer_resize);
7a8e76a3 2254
13292494 2255void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
750912fa
DS
2256{
2257 mutex_lock(&buffer->mutex);
2258 if (val)
2259 buffer->flags |= RB_FL_OVERWRITE;
2260 else
2261 buffer->flags &= ~RB_FL_OVERWRITE;
2262 mutex_unlock(&buffer->mutex);
2263}
2264EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2265
2289d567 2266static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
7a8e76a3 2267{
044fa782 2268 return bpage->page->data + index;
7a8e76a3
SR
2269}
2270
2289d567 2271static __always_inline struct ring_buffer_event *
d769041f 2272rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 2273{
6f807acd
SR
2274 return __rb_page_index(cpu_buffer->reader_page,
2275 cpu_buffer->reader_page->read);
2276}
2277
785888c5
SRV
2278static struct ring_buffer_event *
2279rb_iter_head_event(struct ring_buffer_iter *iter)
bf41a158 2280{
785888c5
SRV
2281 struct ring_buffer_event *event;
2282 struct buffer_page *iter_head_page = iter->head_page;
2283 unsigned long commit;
2284 unsigned length;
2285
153368ce
SRV
2286 if (iter->head != iter->next_event)
2287 return iter->event;
2288
785888c5
SRV
2289 /*
2290 * When the writer goes across pages, it issues a cmpxchg which
2291 * is a mb(), which will synchronize with the rmb here.
2292 * (see rb_tail_page_update() and __rb_reserve_next())
2293 */
2294 commit = rb_page_commit(iter_head_page);
2295 smp_rmb();
95a404bd
SRG
2296
2297 /* An event needs to be at least 8 bytes in size */
2298 if (iter->head > commit - 8)
2299 goto reset;
2300
785888c5
SRV
2301 event = __rb_page_index(iter_head_page, iter->head);
2302 length = rb_event_length(event);
2303
2304 /*
2305 * READ_ONCE() doesn't work on functions and we don't want the
2306 * compiler doing any crazy optimizations with length.
2307 */
2308 barrier();
2309
139f8400 2310 if ((iter->head + length) > commit || length > iter->event_size)
785888c5
SRV
2311 /* Writer corrupted the read? */
2312 goto reset;
2313
2314 memcpy(iter->event, event, length);
2315 /*
2316 * If the page stamp is still the same after this rmb() then the
2317 * event was safely copied without the writer entering the page.
2318 */
2319 smp_rmb();
2320
2321 /* Make sure the page didn't change since we read this */
2322 if (iter->page_stamp != iter_head_page->page->time_stamp ||
2323 commit > rb_page_commit(iter_head_page))
2324 goto reset;
2325
2326 iter->next_event = iter->head + length;
2327 return iter->event;
2328 reset:
2329 /* Reset to the beginning */
2330 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2331 iter->head = 0;
2332 iter->next_event = 0;
c9b7a4a7 2333 iter->missed_events = 1;
785888c5 2334 return NULL;
bf41a158
SR
2335}
2336
25985edc 2337/* Size is determined by what has been committed */
2289d567 2338static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
bf41a158 2339{
fe832be0 2340 return rb_page_commit(bpage) & ~RB_MISSED_MASK;
bf41a158
SR
2341}
2342
2289d567 2343static __always_inline unsigned
bf41a158
SR
2344rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2345{
2346 return rb_page_commit(cpu_buffer->commit_page);
2347}
2348
2289d567 2349static __always_inline unsigned
3cb30911 2350rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event)
bf41a158
SR
2351{
2352 unsigned long addr = (unsigned long)event;
2353
3cb30911
SRG
2354 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1;
2355
2356 return addr - BUF_PAGE_HDR_SIZE;
bf41a158
SR
2357}
2358
34a148bf 2359static void rb_inc_iter(struct ring_buffer_iter *iter)
d769041f
SR
2360{
2361 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2362
2363 /*
2364 * The iterator could be on the reader page (it starts there).
2365 * But the head could have moved, since the reader was
2366 * found. Check for this case and assign the iterator
2367 * to the head page instead of next.
2368 */
2369 if (iter->head_page == cpu_buffer->reader_page)
77ae365e 2370 iter->head_page = rb_set_head_page(cpu_buffer);
d769041f 2371 else
6689bed3 2372 rb_inc_page(&iter->head_page);
d769041f 2373
28e3fc56 2374 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
7a8e76a3 2375 iter->head = 0;
785888c5 2376 iter->next_event = 0;
7a8e76a3
SR
2377}
2378
77ae365e
SR
2379/*
2380 * rb_handle_head_page - writer hit the head page
2381 *
2382 * Returns: +1 to retry page
2383 * 0 to continue
2384 * -1 on error
2385 */
2386static int
2387rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2388 struct buffer_page *tail_page,
2389 struct buffer_page *next_page)
2390{
2391 struct buffer_page *new_head;
2392 int entries;
2393 int type;
2394 int ret;
2395
2396 entries = rb_page_entries(next_page);
2397
2398 /*
2399 * The hard part is here. We need to move the head
2400 * forward, and protect against both readers on
2401 * other CPUs and writers coming in via interrupts.
2402 */
2403 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2404 RB_PAGE_HEAD);
2405
2406 /*
2407 * type can be one of four:
2408 * NORMAL - an interrupt already moved it for us
2409 * HEAD - we are the first to get here.
2410 * UPDATE - we are the interrupt interrupting
2411 * a current move.
2412 * MOVED - a reader on another CPU moved the next
2413 * pointer to its reader page. Give up
2414 * and try again.
2415 */
2416
2417 switch (type) {
2418 case RB_PAGE_HEAD:
2419 /*
2420 * We changed the head to UPDATE, thus
2421 * it is our responsibility to update
2422 * the counters.
2423 */
2424 local_add(entries, &cpu_buffer->overrun);
45d99ea4 2425 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
31029a8b 2426 local_inc(&cpu_buffer->pages_lost);
77ae365e
SR
2427
2428 /*
2429 * The entries will be zeroed out when we move the
2430 * tail page.
2431 */
2432
2433 /* still more to do */
2434 break;
2435
2436 case RB_PAGE_UPDATE:
2437 /*
2438 * This is an interrupt that interrupt the
2439 * previous update. Still more to do.
2440 */
2441 break;
2442 case RB_PAGE_NORMAL:
2443 /*
2444 * An interrupt came in before the update
2445 * and processed this for us.
2446 * Nothing left to do.
2447 */
2448 return 1;
2449 case RB_PAGE_MOVED:
2450 /*
2451 * The reader is on another CPU and just did
2452 * a swap with our next_page.
2453 * Try again.
2454 */
2455 return 1;
2456 default:
2457 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2458 return -1;
2459 }
2460
2461 /*
2462 * Now that we are here, the old head pointer is
2463 * set to UPDATE. This will keep the reader from
2464 * swapping the head page with the reader page.
2465 * The reader (on another CPU) will spin till
2466 * we are finished.
2467 *
2468 * We just need to protect against interrupts
2469 * doing the job. We will set the next pointer
2470 * to HEAD. After that, we set the old pointer
2471 * to NORMAL, but only if it was HEAD before.
2472 * otherwise we are an interrupt, and only
2473 * want the outer most commit to reset it.
2474 */
2475 new_head = next_page;
6689bed3 2476 rb_inc_page(&new_head);
77ae365e
SR
2477
2478 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2479 RB_PAGE_NORMAL);
2480
2481 /*
2482 * Valid returns are:
2483 * HEAD - an interrupt came in and already set it.
2484 * NORMAL - One of two things:
2485 * 1) We really set it.
2486 * 2) A bunch of interrupts came in and moved
2487 * the page forward again.
2488 */
2489 switch (ret) {
2490 case RB_PAGE_HEAD:
2491 case RB_PAGE_NORMAL:
2492 /* OK */
2493 break;
2494 default:
2495 RB_WARN_ON(cpu_buffer, 1);
2496 return -1;
2497 }
2498
2499 /*
2500 * It is possible that an interrupt came in,
2501 * set the head up, then more interrupts came in
2502 * and moved it again. When we get back here,
2503 * the page would have been set to NORMAL but we
2504 * just set it back to HEAD.
2505 *
2506 * How do you detect this? Well, if that happened
2507 * the tail page would have moved.
2508 */
2509 if (ret == RB_PAGE_NORMAL) {
8573636e
SRRH
2510 struct buffer_page *buffer_tail_page;
2511
2512 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
77ae365e
SR
2513 /*
2514 * If the tail had moved passed next, then we need
2515 * to reset the pointer.
2516 */
8573636e
SRRH
2517 if (buffer_tail_page != tail_page &&
2518 buffer_tail_page != next_page)
77ae365e
SR
2519 rb_head_page_set_normal(cpu_buffer, new_head,
2520 next_page,
2521 RB_PAGE_HEAD);
2522 }
2523
2524 /*
2525 * If this was the outer most commit (the one that
2526 * changed the original pointer from HEAD to UPDATE),
2527 * then it is up to us to reset it to NORMAL.
2528 */
2529 if (type == RB_PAGE_HEAD) {
2530 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2531 tail_page,
2532 RB_PAGE_UPDATE);
2533 if (RB_WARN_ON(cpu_buffer,
2534 ret != RB_PAGE_UPDATE))
2535 return -1;
2536 }
2537
2538 return 0;
2539}
2540
c7b09308
SR
2541static inline void
2542rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
fcc742ea 2543 unsigned long tail, struct rb_event_info *info)
c7b09308 2544{
139f8400 2545 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
fcc742ea 2546 struct buffer_page *tail_page = info->tail_page;
c7b09308 2547 struct ring_buffer_event *event;
fcc742ea 2548 unsigned long length = info->length;
c7b09308
SR
2549
2550 /*
2551 * Only the event that crossed the page boundary
2552 * must fill the old tail_page with padding.
2553 */
139f8400 2554 if (tail >= bsize) {
b3230c8b
SR
2555 /*
2556 * If the page was filled, then we still need
2557 * to update the real_end. Reset it to zero
2558 * and the reader will ignore it.
2559 */
139f8400 2560 if (tail == bsize)
b3230c8b
SR
2561 tail_page->real_end = 0;
2562
c7b09308
SR
2563 local_sub(length, &tail_page->write);
2564 return;
2565 }
2566
2567 event = __rb_page_index(tail_page, tail);
2568
ff0ff84a
SR
2569 /*
2570 * Save the original length to the meta data.
2571 * This will be used by the reader to add lost event
2572 * counter.
2573 */
2574 tail_page->real_end = tail;
2575
c7b09308
SR
2576 /*
2577 * If this event is bigger than the minimum size, then
2578 * we need to be careful that we don't subtract the
2579 * write counter enough to allow another writer to slip
2580 * in on this page.
2581 * We put in a discarded commit instead, to make sure
45d99ea4
ZY
2582 * that this space is not used again, and this space will
2583 * not be accounted into 'entries_bytes'.
c7b09308
SR
2584 *
2585 * If we are less than the minimum size, we don't need to
2586 * worry about it.
2587 */
139f8400 2588 if (tail > (bsize - RB_EVNT_MIN_SIZE)) {
c7b09308
SR
2589 /* No room for any events */
2590
2591 /* Mark the rest of the page with padding */
2592 rb_event_set_padding(event);
2593
a0fcaaed
SRG
2594 /* Make sure the padding is visible before the write update */
2595 smp_wmb();
2596
c7b09308
SR
2597 /* Set the write back to the previous setting */
2598 local_sub(length, &tail_page->write);
2599 return;
2600 }
2601
2602 /* Put in a discarded event */
139f8400 2603 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE;
c7b09308
SR
2604 event->type_len = RINGBUF_TYPE_PADDING;
2605 /* time delta must be non zero */
2606 event->time_delta = 1;
c7b09308 2607
45d99ea4 2608 /* account for padding bytes */
139f8400 2609 local_add(bsize - tail, &cpu_buffer->entries_bytes);
45d99ea4 2610
a0fcaaed
SRG
2611 /* Make sure the padding is visible before the tail_page->write update */
2612 smp_wmb();
2613
c7b09308 2614 /* Set write to end of buffer */
139f8400 2615 length = (tail + length) - bsize;
c7b09308
SR
2616 local_sub(length, &tail_page->write);
2617}
6634ff26 2618
4239c38f
SRRH
2619static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2620
747e94ae
SR
2621/*
2622 * This is the slow path, force gcc not to inline it.
2623 */
2624static noinline struct ring_buffer_event *
6634ff26 2625rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
fcc742ea 2626 unsigned long tail, struct rb_event_info *info)
7a8e76a3 2627{
fcc742ea 2628 struct buffer_page *tail_page = info->tail_page;
5a50e33c 2629 struct buffer_page *commit_page = cpu_buffer->commit_page;
13292494 2630 struct trace_buffer *buffer = cpu_buffer->buffer;
77ae365e
SR
2631 struct buffer_page *next_page;
2632 int ret;
aa20ae84
SR
2633
2634 next_page = tail_page;
2635
6689bed3 2636 rb_inc_page(&next_page);
aa20ae84 2637
aa20ae84
SR
2638 /*
2639 * If for some reason, we had an interrupt storm that made
2640 * it all the way around the buffer, bail, and warn
2641 * about it.
2642 */
2643 if (unlikely(next_page == commit_page)) {
77ae365e 2644 local_inc(&cpu_buffer->commit_overrun);
aa20ae84
SR
2645 goto out_reset;
2646 }
2647
77ae365e
SR
2648 /*
2649 * This is where the fun begins!
2650 *
2651 * We are fighting against races between a reader that
2652 * could be on another CPU trying to swap its reader
2653 * page with the buffer head.
2654 *
2655 * We are also fighting against interrupts coming in and
2656 * moving the head or tail on us as well.
2657 *
2658 * If the next page is the head page then we have filled
2659 * the buffer, unless the commit page is still on the
2660 * reader page.
2661 */
6689bed3 2662 if (rb_is_head_page(next_page, &tail_page->list)) {
aa20ae84 2663
77ae365e
SR
2664 /*
2665 * If the commit is not on the reader page, then
2666 * move the header page.
2667 */
2668 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2669 /*
2670 * If we are not in overwrite mode,
2671 * this is easy, just stop here.
2672 */
884bfe89
SP
2673 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2674 local_inc(&cpu_buffer->dropped_events);
77ae365e 2675 goto out_reset;
884bfe89 2676 }
77ae365e
SR
2677
2678 ret = rb_handle_head_page(cpu_buffer,
2679 tail_page,
2680 next_page);
2681 if (ret < 0)
2682 goto out_reset;
2683 if (ret)
2684 goto out_again;
2685 } else {
2686 /*
2687 * We need to be careful here too. The
2688 * commit page could still be on the reader
2689 * page. We could have a small buffer, and
2690 * have filled up the buffer with events
2691 * from interrupts and such, and wrapped.
2692 *
c6358bac 2693 * Note, if the tail page is also on the
77ae365e
SR
2694 * reader_page, we let it move out.
2695 */
2696 if (unlikely((cpu_buffer->commit_page !=
2697 cpu_buffer->tail_page) &&
2698 (cpu_buffer->commit_page ==
2699 cpu_buffer->reader_page))) {
2700 local_inc(&cpu_buffer->commit_overrun);
2701 goto out_reset;
2702 }
aa20ae84
SR
2703 }
2704 }
2705
70004986 2706 rb_tail_page_update(cpu_buffer, tail_page, next_page);
aa20ae84 2707
77ae365e 2708 out_again:
aa20ae84 2709
fcc742ea 2710 rb_reset_tail(cpu_buffer, tail, info);
aa20ae84 2711
4239c38f
SRRH
2712 /* Commit what we have for now. */
2713 rb_end_commit(cpu_buffer);
2714 /* rb_end_commit() decs committing */
2715 local_inc(&cpu_buffer->committing);
2716
aa20ae84
SR
2717 /* fail and let the caller try again */
2718 return ERR_PTR(-EAGAIN);
2719
45141d46 2720 out_reset:
6f3b3440 2721 /* reset write */
fcc742ea 2722 rb_reset_tail(cpu_buffer, tail, info);
6f3b3440 2723
bf41a158 2724 return NULL;
7a8e76a3
SR
2725}
2726
74e87937
SRV
2727/* Slow path */
2728static struct ring_buffer_event *
3cb30911
SRG
2729rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2730 struct ring_buffer_event *event, u64 delta, bool abs)
9826b273 2731{
dc4e2801
TZ
2732 if (abs)
2733 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2734 else
2735 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
9826b273 2736
dc4e2801 2737 /* Not the first event on the page, or not delta? */
3cb30911 2738 if (abs || rb_event_index(cpu_buffer, event)) {
d90fd774
SRRH
2739 event->time_delta = delta & TS_MASK;
2740 event->array[0] = delta >> TS_SHIFT;
2741 } else {
2742 /* nope, just zero it */
2743 event->time_delta = 0;
2744 event->array[0] = 0;
2745 }
a4543a2f 2746
d90fd774
SRRH
2747 return skip_time_extend(event);
2748}
a4543a2f 2749
58fbc3c6
SRV
2750#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2751static inline bool sched_clock_stable(void)
2752{
2753 return true;
2754}
2755#endif
2756
74e87937 2757static void
58fbc3c6
SRV
2758rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2759 struct rb_event_info *info)
2760{
2761 u64 write_stamp;
2762
29ce2451 2763 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
58fbc3c6
SRV
2764 (unsigned long long)info->delta,
2765 (unsigned long long)info->ts,
2766 (unsigned long long)info->before,
2767 (unsigned long long)info->after,
c84897c0 2768 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}),
58fbc3c6
SRV
2769 sched_clock_stable() ? "" :
2770 "If you just came from a suspend/resume,\n"
2771 "please switch to the trace global clock:\n"
2455f0e1 2772 " echo global > /sys/kernel/tracing/trace_clock\n"
58fbc3c6
SRV
2773 "or add trace_clock=global to the kernel command line\n");
2774}
2775
74e87937
SRV
2776static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2777 struct ring_buffer_event **event,
2778 struct rb_event_info *info,
2779 u64 *delta,
2780 unsigned int *length)
2781{
2782 bool abs = info->add_timestamp &
2783 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2784
29ce2451 2785 if (unlikely(info->delta > (1ULL << 59))) {
6695da58
SRG
2786 /*
2787 * Some timers can use more than 59 bits, and when a timestamp
2788 * is added to the buffer, it will lose those bits.
2789 */
2790 if (abs && (info->ts & TS_MSB)) {
2791 info->delta &= ABS_TS_MASK;
2792
29ce2451 2793 /* did the clock go backwards */
6695da58 2794 } else if (info->before == info->after && info->before > info->ts) {
29ce2451
SRV
2795 /* not interrupted */
2796 static int once;
2797
2798 /*
2799 * This is possible with a recalibrating of the TSC.
2800 * Do not produce a call stack, but just report it.
2801 */
2802 if (!once) {
2803 once++;
2804 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2805 info->before, info->ts);
2806 }
2807 } else
2808 rb_check_timestamp(cpu_buffer, info);
2809 if (!abs)
2810 info->delta = 0;
2811 }
3cb30911 2812 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs);
74e87937
SRV
2813 *length -= RB_LEN_TIME_EXTEND;
2814 *delta = 0;
2815}
2816
d90fd774
SRRH
2817/**
2818 * rb_update_event - update event type and data
cfc585a4 2819 * @cpu_buffer: The per cpu buffer of the @event
d90fd774 2820 * @event: the event to update
cfc585a4 2821 * @info: The info to update the @event with (contains length and delta)
d90fd774 2822 *
cfc585a4 2823 * Update the type and data fields of the @event. The length
d90fd774
SRRH
2824 * is the actual size that is written to the ring buffer,
2825 * and with this, we can determine what to place into the
2826 * data field.
2827 */
b7dc42fd 2828static void
d90fd774
SRRH
2829rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2830 struct ring_buffer_event *event,
2831 struct rb_event_info *info)
2832{
2833 unsigned length = info->length;
2834 u64 delta = info->delta;
8672e494
SRV
2835 unsigned int nest = local_read(&cpu_buffer->committing) - 1;
2836
a948c69d 2837 if (!WARN_ON_ONCE(nest >= MAX_NEST))
8672e494 2838 cpu_buffer->event_stamp[nest] = info->ts;
a4543a2f
SRRH
2839
2840 /*
d90fd774 2841 * If we need to add a timestamp, then we
6167c205 2842 * add it to the start of the reserved space.
a4543a2f 2843 */
74e87937
SRV
2844 if (unlikely(info->add_timestamp))
2845 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
a4543a2f 2846
d90fd774
SRRH
2847 event->time_delta = delta;
2848 length -= RB_EVNT_HDR_SIZE;
adab66b7 2849 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
d90fd774
SRRH
2850 event->type_len = 0;
2851 event->array[0] = length;
2852 } else
2853 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2854}
2855
2856static unsigned rb_calculate_event_length(unsigned length)
2857{
2858 struct ring_buffer_event event; /* Used only for sizeof array */
2859
2860 /* zero length can cause confusions */
2861 if (!length)
2862 length++;
2863
adab66b7 2864 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
d90fd774
SRRH
2865 length += sizeof(event.array[0]);
2866
2867 length += RB_EVNT_HDR_SIZE;
adab66b7 2868 length = ALIGN(length, RB_ARCH_ALIGNMENT);
d90fd774
SRRH
2869
2870 /*
2871 * In case the time delta is larger than the 27 bits for it
2872 * in the header, we need to add a timestamp. If another
2873 * event comes in when trying to discard this one to increase
2874 * the length, then the timestamp will be added in the allocated
2875 * space of this event. If length is bigger than the size needed
2876 * for the TIME_EXTEND, then padding has to be used. The events
2877 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2878 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2879 * As length is a multiple of 4, we only need to worry if it
2880 * is 12 (RB_LEN_TIME_EXTEND + 4).
2881 */
2882 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2883 length += RB_ALIGNMENT;
2884
2885 return length;
2886}
2887
bc92b956 2888static inline bool
d90fd774
SRRH
2889rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2890 struct ring_buffer_event *event)
2891{
2892 unsigned long new_index, old_index;
2893 struct buffer_page *bpage;
d90fd774
SRRH
2894 unsigned long addr;
2895
3cb30911 2896 new_index = rb_event_index(cpu_buffer, event);
d90fd774
SRRH
2897 old_index = new_index + rb_event_ts_length(event);
2898 addr = (unsigned long)event;
3cb30911 2899 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
d90fd774 2900
8573636e 2901 bpage = READ_ONCE(cpu_buffer->tail_page);
d90fd774 2902
083e9f65
SRG
2903 /*
2904 * Make sure the tail_page is still the same and
2905 * the next write location is the end of this event
2906 */
d90fd774
SRRH
2907 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2908 unsigned long write_mask =
2909 local_read(&bpage->write) & ~RB_WRITE_MASK;
2910 unsigned long event_length = rb_event_length(event);
a389d86f 2911
b2dd7975
SRG
2912 /*
2913 * For the before_stamp to be different than the write_stamp
2914 * to make sure that the next event adds an absolute
2915 * value and does not rely on the saved write stamp, which
2916 * is now going to be bogus.
083e9f65
SRG
2917 *
2918 * By setting the before_stamp to zero, the next event
2919 * is not going to use the write_stamp and will instead
2920 * create an absolute timestamp. This means there's no
2921 * reason to update the wirte_stamp!
b2dd7975
SRG
2922 */
2923 rb_time_set(&cpu_buffer->before_stamp, 0);
2924
a389d86f
SRV
2925 /*
2926 * If an event were to come in now, it would see that the
2927 * write_stamp and the before_stamp are different, and assume
2928 * that this event just added itself before updating
2929 * the write stamp. The interrupting event will fix the
083e9f65 2930 * write stamp for us, and use an absolute timestamp.
a389d86f
SRV
2931 */
2932
d90fd774
SRRH
2933 /*
2934 * This is on the tail page. It is possible that
2935 * a write could come in and move the tail page
2936 * and write to the next page. That is fine
2937 * because we just shorten what is on this page.
2938 */
2939 old_index += write_mask;
2940 new_index += write_mask;
00a8478f
UB
2941
2942 /* caution: old_index gets updated on cmpxchg failure */
2943 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) {
d90fd774
SRRH
2944 /* update counters */
2945 local_sub(event_length, &cpu_buffer->entries_bytes);
bc92b956 2946 return true;
d90fd774
SRRH
2947 }
2948 }
2949
2950 /* could not discard */
bc92b956 2951 return false;
d90fd774
SRRH
2952}
2953
2954static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2955{
2956 local_inc(&cpu_buffer->committing);
2957 local_inc(&cpu_buffer->commits);
2958}
2959
38e11df1 2960static __always_inline void
d90fd774
SRRH
2961rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2962{
2963 unsigned long max_count;
2964
2965 /*
2966 * We only race with interrupts and NMIs on this CPU.
2967 * If we own the commit event, then we can commit
2968 * all others that interrupted us, since the interruptions
2969 * are in stack format (they finish before they come
2970 * back to us). This allows us to do a simple loop to
2971 * assign the commit to the tail.
2972 */
2973 again:
2974 max_count = cpu_buffer->nr_pages * 100;
2975
8573636e 2976 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
d90fd774
SRRH
2977 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2978 return;
2979 if (RB_WARN_ON(cpu_buffer,
2980 rb_is_reader_page(cpu_buffer->tail_page)))
2981 return;
6455b616
ZY
2982 /*
2983 * No need for a memory barrier here, as the update
2984 * of the tail_page did it for this page.
2985 */
d90fd774
SRRH
2986 local_set(&cpu_buffer->commit_page->page->commit,
2987 rb_page_write(cpu_buffer->commit_page));
6689bed3 2988 rb_inc_page(&cpu_buffer->commit_page);
d90fd774
SRRH
2989 /* add barrier to keep gcc from optimizing too much */
2990 barrier();
2991 }
2992 while (rb_commit_index(cpu_buffer) !=
2993 rb_page_write(cpu_buffer->commit_page)) {
2994
6455b616
ZY
2995 /* Make sure the readers see the content of what is committed. */
2996 smp_wmb();
d90fd774
SRRH
2997 local_set(&cpu_buffer->commit_page->page->commit,
2998 rb_page_write(cpu_buffer->commit_page));
2999 RB_WARN_ON(cpu_buffer,
3000 local_read(&cpu_buffer->commit_page->page->commit) &
3001 ~RB_WRITE_MASK);
3002 barrier();
3003 }
3004
3005 /* again, keep gcc from optimizing */
3006 barrier();
3007
3008 /*
3009 * If an interrupt came in just after the first while loop
3010 * and pushed the tail page forward, we will be left with
3011 * a dangling commit that will never go forward.
3012 */
8573636e 3013 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
d90fd774
SRRH
3014 goto again;
3015}
3016
38e11df1 3017static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
d90fd774
SRRH
3018{
3019 unsigned long commits;
3020
3021 if (RB_WARN_ON(cpu_buffer,
3022 !local_read(&cpu_buffer->committing)))
3023 return;
3024
3025 again:
3026 commits = local_read(&cpu_buffer->commits);
3027 /* synchronize with interrupts */
3028 barrier();
3029 if (local_read(&cpu_buffer->committing) == 1)
3030 rb_set_commit_to_write(cpu_buffer);
3031
3032 local_dec(&cpu_buffer->committing);
3033
3034 /* synchronize with interrupts */
3035 barrier();
3036
3037 /*
3038 * Need to account for interrupts coming in between the
3039 * updating of the commit page and the clearing of the
3040 * committing counter.
3041 */
3042 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3043 !local_read(&cpu_buffer->committing)) {
3044 local_inc(&cpu_buffer->committing);
3045 goto again;
3046 }
3047}
3048
3049static inline void rb_event_discard(struct ring_buffer_event *event)
3050{
dc4e2801 3051 if (extended_time(event))
d90fd774
SRRH
3052 event = skip_time_extend(event);
3053
3054 /* array[0] holds the actual length for the discarded event */
3055 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3056 event->type_len = RINGBUF_TYPE_PADDING;
3057 /* time delta must be non zero */
3058 if (!event->time_delta)
3059 event->time_delta = 1;
3060}
3061
04aabc32 3062static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
d90fd774
SRRH
3063{
3064 local_inc(&cpu_buffer->entries);
d90fd774
SRRH
3065 rb_end_commit(cpu_buffer);
3066}
3067
3068static __always_inline void
13292494 3069rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
d90fd774 3070{
d90fd774
SRRH
3071 if (buffer->irq_work.waiters_pending) {
3072 buffer->irq_work.waiters_pending = false;
3073 /* irq_work_queue() supplies it's own memory barriers */
3074 irq_work_queue(&buffer->irq_work.work);
3075 }
3076
3077 if (cpu_buffer->irq_work.waiters_pending) {
3078 cpu_buffer->irq_work.waiters_pending = false;
3079 /* irq_work_queue() supplies it's own memory barriers */
3080 irq_work_queue(&cpu_buffer->irq_work.work);
3081 }
3082
03329f99
SRV
3083 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3084 return;
d90fd774 3085
03329f99
SRV
3086 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3087 return;
2c2b0a78 3088
03329f99
SRV
3089 if (!cpu_buffer->irq_work.full_waiters_pending)
3090 return;
2c2b0a78 3091
03329f99
SRV
3092 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3093
42fb0a1e 3094 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
03329f99
SRV
3095 return;
3096
3097 cpu_buffer->irq_work.wakeup_full = true;
3098 cpu_buffer->irq_work.full_waiters_pending = false;
3099 /* irq_work_queue() supplies it's own memory barriers */
3100 irq_work_queue(&cpu_buffer->irq_work.work);
d90fd774
SRRH
3101}
3102
28575c61
SRV
3103#ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
3104# define do_ring_buffer_record_recursion() \
3105 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
3106#else
3107# define do_ring_buffer_record_recursion() do { } while (0)
3108#endif
3109
d90fd774
SRRH
3110/*
3111 * The lock and unlock are done within a preempt disable section.
3112 * The current_context per_cpu variable can only be modified
3113 * by the current task between lock and unlock. But it can
a0e3a18f
SRV
3114 * be modified more than once via an interrupt. To pass this
3115 * information from the lock to the unlock without having to
3116 * access the 'in_interrupt()' functions again (which do show
3117 * a bit of overhead in something as critical as function tracing,
3118 * we use a bitmask trick.
d90fd774 3119 *
b02414c8
SRV
3120 * bit 1 = NMI context
3121 * bit 2 = IRQ context
3122 * bit 3 = SoftIRQ context
3123 * bit 4 = normal context.
d90fd774 3124 *
a0e3a18f
SRV
3125 * This works because this is the order of contexts that can
3126 * preempt other contexts. A SoftIRQ never preempts an IRQ
3127 * context.
3128 *
3129 * When the context is determined, the corresponding bit is
3130 * checked and set (if it was set, then a recursion of that context
3131 * happened).
3132 *
3133 * On unlock, we need to clear this bit. To do so, just subtract
3134 * 1 from the current_context and AND it to itself.
3135 *
3136 * (binary)
3137 * 101 - 1 = 100
3138 * 101 & 100 = 100 (clearing bit zero)
3139 *
3140 * 1010 - 1 = 1001
3141 * 1010 & 1001 = 1000 (clearing bit 1)
3142 *
3143 * The least significant bit can be cleared this way, and it
3144 * just so happens that it is the same bit corresponding to
3145 * the current context.
b02414c8
SRV
3146 *
3147 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3148 * is set when a recursion is detected at the current context, and if
3149 * the TRANSITION bit is already set, it will fail the recursion.
3150 * This is needed because there's a lag between the changing of
3151 * interrupt context and updating the preempt count. In this case,
3152 * a false positive will be found. To handle this, one extra recursion
3153 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3154 * bit is already set, then it is considered a recursion and the function
3155 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3156 *
3157 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3158 * to be cleared. Even if it wasn't the context that set it. That is,
3159 * if an interrupt comes in while NORMAL bit is set and the ring buffer
3160 * is called before preempt_count() is updated, since the check will
3161 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3162 * NMI then comes in, it will set the NMI bit, but when the NMI code
f2cc020d 3163 * does the trace_recursive_unlock() it will clear the TRANSITION bit
b02414c8
SRV
3164 * and leave the NMI bit set. But this is fine, because the interrupt
3165 * code that set the TRANSITION bit will then clear the NMI bit when it
3166 * calls trace_recursive_unlock(). If another NMI comes in, it will
3167 * set the TRANSITION bit and continue.
3168 *
3169 * Note: The TRANSITION bit only handles a single transition between context.
d90fd774
SRRH
3170 */
3171
bc92b956 3172static __always_inline bool
d90fd774
SRRH
3173trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3174{
a0e3a18f 3175 unsigned int val = cpu_buffer->current_context;
91ebe8bc 3176 int bit = interrupt_context_level();
9b84fadc
SRV
3177
3178 bit = RB_CTX_NORMAL - bit;
a0e3a18f 3179
b02414c8
SRV
3180 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3181 /*
3182 * It is possible that this was called by transitioning
3183 * between interrupt context, and preempt_count() has not
3184 * been updated yet. In this case, use the TRANSITION bit.
3185 */
3186 bit = RB_CTX_TRANSITION;
28575c61
SRV
3187 if (val & (1 << (bit + cpu_buffer->nest))) {
3188 do_ring_buffer_record_recursion();
bc92b956 3189 return true;
28575c61 3190 }
b02414c8 3191 }
d90fd774 3192
8e012066 3193 val |= (1 << (bit + cpu_buffer->nest));
a0e3a18f 3194 cpu_buffer->current_context = val;
d90fd774 3195
bc92b956 3196 return false;
d90fd774
SRRH
3197}
3198
3199static __always_inline void
3200trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3201{
8e012066
SRV
3202 cpu_buffer->current_context &=
3203 cpu_buffer->current_context - (1 << cpu_buffer->nest);
3204}
3205
b02414c8
SRV
3206/* The recursive locking above uses 5 bits */
3207#define NESTED_BITS 5
8e012066
SRV
3208
3209/**
3210 * ring_buffer_nest_start - Allow to trace while nested
3211 * @buffer: The ring buffer to modify
3212 *
6167c205 3213 * The ring buffer has a safety mechanism to prevent recursion.
8e012066
SRV
3214 * But there may be a case where a trace needs to be done while
3215 * tracing something else. In this case, calling this function
3216 * will allow this function to nest within a currently active
3217 * ring_buffer_lock_reserve().
3218 *
3219 * Call this function before calling another ring_buffer_lock_reserve() and
3220 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3221 */
13292494 3222void ring_buffer_nest_start(struct trace_buffer *buffer)
8e012066
SRV
3223{
3224 struct ring_buffer_per_cpu *cpu_buffer;
3225 int cpu;
3226
3227 /* Enabled by ring_buffer_nest_end() */
3228 preempt_disable_notrace();
3229 cpu = raw_smp_processor_id();
3230 cpu_buffer = buffer->buffers[cpu];
6167c205 3231 /* This is the shift value for the above recursive locking */
8e012066
SRV
3232 cpu_buffer->nest += NESTED_BITS;
3233}
3234
3235/**
3236 * ring_buffer_nest_end - Allow to trace while nested
3237 * @buffer: The ring buffer to modify
3238 *
3239 * Must be called after ring_buffer_nest_start() and after the
3240 * ring_buffer_unlock_commit().
3241 */
13292494 3242void ring_buffer_nest_end(struct trace_buffer *buffer)
8e012066
SRV
3243{
3244 struct ring_buffer_per_cpu *cpu_buffer;
3245 int cpu;
3246
3247 /* disabled by ring_buffer_nest_start() */
3248 cpu = raw_smp_processor_id();
3249 cpu_buffer = buffer->buffers[cpu];
6167c205 3250 /* This is the shift value for the above recursive locking */
8e012066
SRV
3251 cpu_buffer->nest -= NESTED_BITS;
3252 preempt_enable_notrace();
d90fd774
SRRH
3253}
3254
3255/**
3256 * ring_buffer_unlock_commit - commit a reserved
3257 * @buffer: The buffer to commit to
d90fd774
SRRH
3258 *
3259 * This commits the data to the ring buffer, and releases any locks held.
3260 *
3261 * Must be paired with ring_buffer_lock_reserve.
3262 */
04aabc32 3263int ring_buffer_unlock_commit(struct trace_buffer *buffer)
d90fd774
SRRH
3264{
3265 struct ring_buffer_per_cpu *cpu_buffer;
3266 int cpu = raw_smp_processor_id();
3267
3268 cpu_buffer = buffer->buffers[cpu];
3269
04aabc32 3270 rb_commit(cpu_buffer);
d90fd774
SRRH
3271
3272 rb_wakeups(buffer, cpu_buffer);
3273
3274 trace_recursive_unlock(cpu_buffer);
3275
3276 preempt_enable_notrace();
3277
3278 return 0;
3279}
3280EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3281
5b7be9c7
SRV
3282/* Special value to validate all deltas on a page. */
3283#define CHECK_FULL_PAGE 1L
3284
3285#ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
d40dbb61
SRG
3286
3287static const char *show_irq_str(int bits)
3288{
3289 const char *type[] = {
3290 ".", // 0
3291 "s", // 1
3292 "h", // 2
3293 "Hs", // 3
3294 "n", // 4
3295 "Ns", // 5
3296 "Nh", // 6
3297 "NHs", // 7
3298 };
3299
3300 return type[bits];
3301}
3302
3303/* Assume this is an trace event */
3304static const char *show_flags(struct ring_buffer_event *event)
3305{
3306 struct trace_entry *entry;
3307 int bits = 0;
3308
3309 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3310 return "X";
3311
3312 entry = ring_buffer_event_data(event);
3313
3314 if (entry->flags & TRACE_FLAG_SOFTIRQ)
3315 bits |= 1;
3316
3317 if (entry->flags & TRACE_FLAG_HARDIRQ)
3318 bits |= 2;
3319
3320 if (entry->flags & TRACE_FLAG_NMI)
3321 bits |= 4;
3322
3323 return show_irq_str(bits);
3324}
3325
3326static const char *show_irq(struct ring_buffer_event *event)
3327{
3328 struct trace_entry *entry;
3329
3330 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3331 return "";
3332
3333 entry = ring_buffer_event_data(event);
3334 if (entry->flags & TRACE_FLAG_IRQS_OFF)
3335 return "d";
3336 return "";
3337}
3338
3339static const char *show_interrupt_level(void)
3340{
3341 unsigned long pc = preempt_count();
3342 unsigned char level = 0;
3343
3344 if (pc & SOFTIRQ_OFFSET)
3345 level |= 1;
3346
3347 if (pc & HARDIRQ_MASK)
3348 level |= 2;
3349
3350 if (pc & NMI_MASK)
3351 level |= 4;
3352
3353 return show_irq_str(level);
3354}
3355
5b7be9c7
SRV
3356static void dump_buffer_page(struct buffer_data_page *bpage,
3357 struct rb_event_info *info,
3358 unsigned long tail)
3359{
3360 struct ring_buffer_event *event;
3361 u64 ts, delta;
3362 int e;
3363
3364 ts = bpage->time_stamp;
3365 pr_warn(" [%lld] PAGE TIME STAMP\n", ts);
3366
3367 for (e = 0; e < tail; e += rb_event_length(event)) {
3368
3369 event = (struct ring_buffer_event *)(bpage->data + e);
3370
3371 switch (event->type_len) {
3372
3373 case RINGBUF_TYPE_TIME_EXTEND:
e20044f7 3374 delta = rb_event_time_stamp(event);
5b7be9c7 3375 ts += delta;
0b9036ef
SRG
3376 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n",
3377 e, ts, delta);
5b7be9c7
SRV
3378 break;
3379
3380 case RINGBUF_TYPE_TIME_STAMP:
e20044f7 3381 delta = rb_event_time_stamp(event);
6695da58 3382 ts = rb_fix_abs_ts(delta, ts);
0b9036ef
SRG
3383 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n",
3384 e, ts, delta);
5b7be9c7
SRV
3385 break;
3386
3387 case RINGBUF_TYPE_PADDING:
3388 ts += event->time_delta;
0b9036ef
SRG
3389 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n",
3390 e, ts, event->time_delta);
5b7be9c7
SRV
3391 break;
3392
3393 case RINGBUF_TYPE_DATA:
3394 ts += event->time_delta;
d40dbb61
SRG
3395 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n",
3396 e, ts, event->time_delta,
3397 show_flags(event), show_irq(event));
5b7be9c7
SRV
3398 break;
3399
3400 default:
3401 break;
3402 }
3403 }
0b9036ef 3404 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e);
5b7be9c7
SRV
3405}
3406
3407static DEFINE_PER_CPU(atomic_t, checking);
3408static atomic_t ts_dump;
3409
f50345b4
SRG
3410#define buffer_warn_return(fmt, ...) \
3411 do { \
3412 /* If another report is happening, ignore this one */ \
3413 if (atomic_inc_return(&ts_dump) != 1) { \
3414 atomic_dec(&ts_dump); \
3415 goto out; \
3416 } \
3417 atomic_inc(&cpu_buffer->record_disabled); \
3418 pr_warn(fmt, ##__VA_ARGS__); \
3419 dump_buffer_page(bpage, info, tail); \
3420 atomic_dec(&ts_dump); \
3421 /* There's some cases in boot up that this can happen */ \
3422 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \
3423 /* Do not re-enable checking */ \
3424 return; \
3425 } while (0)
3426
5b7be9c7
SRV
3427/*
3428 * Check if the current event time stamp matches the deltas on
3429 * the buffer page.
3430 */
3431static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3432 struct rb_event_info *info,
3433 unsigned long tail)
3434{
3435 struct ring_buffer_event *event;
3436 struct buffer_data_page *bpage;
3437 u64 ts, delta;
3438 bool full = false;
3439 int e;
3440
3441 bpage = info->tail_page->page;
3442
3443 if (tail == CHECK_FULL_PAGE) {
3444 full = true;
3445 tail = local_read(&bpage->commit);
3446 } else if (info->add_timestamp &
3447 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
3448 /* Ignore events with absolute time stamps */
3449 return;
3450 }
3451
3452 /*
3453 * Do not check the first event (skip possible extends too).
3454 * Also do not check if previous events have not been committed.
3455 */
3456 if (tail <= 8 || tail > local_read(&bpage->commit))
3457 return;
3458
3459 /*
083e9f65 3460 * If this interrupted another event,
5b7be9c7
SRV
3461 */
3462 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
3463 goto out;
3464
3465 ts = bpage->time_stamp;
3466
3467 for (e = 0; e < tail; e += rb_event_length(event)) {
3468
3469 event = (struct ring_buffer_event *)(bpage->data + e);
3470
3471 switch (event->type_len) {
3472
3473 case RINGBUF_TYPE_TIME_EXTEND:
e20044f7 3474 delta = rb_event_time_stamp(event);
5b7be9c7
SRV
3475 ts += delta;
3476 break;
3477
3478 case RINGBUF_TYPE_TIME_STAMP:
e20044f7 3479 delta = rb_event_time_stamp(event);
f50345b4
SRG
3480 delta = rb_fix_abs_ts(delta, ts);
3481 if (delta < ts) {
3482 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n",
3483 cpu_buffer->cpu, ts, delta);
3484 }
3485 ts = delta;
5b7be9c7
SRV
3486 break;
3487
3488 case RINGBUF_TYPE_PADDING:
3489 if (event->time_delta == 1)
3490 break;
957cdcd9 3491 fallthrough;
5b7be9c7
SRV
3492 case RINGBUF_TYPE_DATA:
3493 ts += event->time_delta;
3494 break;
3495
3496 default:
3497 RB_WARN_ON(cpu_buffer, 1);
3498 }
3499 }
3500 if ((full && ts > info->ts) ||
3501 (!full && ts + info->delta != info->ts)) {
f50345b4
SRG
3502 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n",
3503 cpu_buffer->cpu,
3504 ts + info->delta, info->ts, info->delta,
3505 info->before, info->after,
3506 full ? " (full)" : "", show_interrupt_level());
5b7be9c7
SRV
3507 }
3508out:
3509 atomic_dec(this_cpu_ptr(&checking));
3510}
3511#else
3512static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3513 struct rb_event_info *info,
3514 unsigned long tail)
3515{
3516}
3517#endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
3518
6634ff26
SR
3519static struct ring_buffer_event *
3520__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
fcc742ea 3521 struct rb_event_info *info)
6634ff26 3522{
6634ff26 3523 struct ring_buffer_event *event;
fcc742ea 3524 struct buffer_page *tail_page;
a389d86f 3525 unsigned long tail, write, w;
69d1b839 3526
8573636e
SRRH
3527 /* Don't let the compiler play games with cpu_buffer->tail_page */
3528 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
a389d86f
SRV
3529
3530 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
3531 barrier();
c84897c0
SRG
3532 rb_time_read(&cpu_buffer->before_stamp, &info->before);
3533 rb_time_read(&cpu_buffer->write_stamp, &info->after);
a389d86f
SRV
3534 barrier();
3535 info->ts = rb_time_stamp(cpu_buffer->buffer);
3536
58fbc3c6 3537 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
a389d86f 3538 info->delta = info->ts;
a389d86f 3539 } else {
58fbc3c6
SRV
3540 /*
3541 * If interrupting an event time update, we may need an
3542 * absolute timestamp.
3543 * Don't bother if this is the start of a new page (w == 0).
3544 */
b3ae7b67
SRG
3545 if (!w) {
3546 /* Use the sub-buffer timestamp */
3547 info->delta = 0;
c84897c0 3548 } else if (unlikely(info->before != info->after)) {
58fbc3c6
SRV
3549 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3550 info->length += RB_LEN_TIME_EXTEND;
3551 } else {
3552 info->delta = info->ts - info->after;
3553 if (unlikely(test_time_stamp(info->delta))) {
3554 info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3555 info->length += RB_LEN_TIME_EXTEND;
3556 }
10464b4a 3557 }
7c4b4a51 3558 }
b7dc42fd 3559
10464b4a 3560 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);
a389d86f
SRV
3561
3562 /*C*/ write = local_add_return(info->length, &tail_page->write);
77ae365e
SR
3563
3564 /* set write to only the index of the write */
3565 write &= RB_WRITE_MASK;
a389d86f 3566
fcc742ea 3567 tail = write - info->length;
6634ff26 3568
a389d86f 3569 /* See if we shot pass the end of this buffer page */
139f8400 3570 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) {
9e45e39d 3571 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
a389d86f
SRV
3572 return rb_move_tail(cpu_buffer, tail, info);
3573 }
3574
3575 if (likely(tail == w)) {
a389d86f 3576 /* Nothing interrupted us between A and C */
10464b4a 3577 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts);
dd939425
SRG
3578 /*
3579 * If something came in between C and D, the write stamp
3580 * may now not be in sync. But that's fine as the before_stamp
3581 * will be different and then next event will just be forced
3582 * to use an absolute timestamp.
3583 */
7c4b4a51
SRV
3584 if (likely(!(info->add_timestamp &
3585 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
a389d86f 3586 /* This did not interrupt any time update */
58fbc3c6 3587 info->delta = info->ts - info->after;
a389d86f 3588 else
82db909e 3589 /* Just use full timestamp for interrupting event */
a389d86f 3590 info->delta = info->ts;
5b7be9c7 3591 check_buffer(cpu_buffer, info, tail);
a389d86f
SRV
3592 } else {
3593 u64 ts;
3594 /* SLOW PATH - Interrupted between A and C */
b803d7c6
SRG
3595
3596 /* Save the old before_stamp */
c84897c0 3597 rb_time_read(&cpu_buffer->before_stamp, &info->before);
b803d7c6
SRG
3598
3599 /*
3600 * Read a new timestamp and update the before_stamp to make
3601 * the next event after this one force using an absolute
3602 * timestamp. This is in case an interrupt were to come in
3603 * between E and F.
3604 */
a389d86f 3605 ts = rb_time_stamp(cpu_buffer->buffer);
b803d7c6
SRG
3606 rb_time_set(&cpu_buffer->before_stamp, ts);
3607
3608 barrier();
c84897c0 3609 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after);
a389d86f 3610 barrier();
b803d7c6
SRG
3611 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3612 info->after == info->before && info->after < ts) {
3613 /*
3614 * Nothing came after this event between C and F, it is
3615 * safe to use info->after for the delta as it
3616 * matched info->before and is still valid.
3617 */
58fbc3c6 3618 info->delta = ts - info->after;
a389d86f
SRV
3619 } else {
3620 /*
b803d7c6 3621 * Interrupted between C and F:
a389d86f
SRV
3622 * Lost the previous events time stamp. Just set the
3623 * delta to zero, and this will be the same time as
3624 * the event this event interrupted. And the events that
3625 * came after this will still be correct (as they would
3626 * have built their delta on the previous event.
3627 */
3628 info->delta = 0;
3629 }
8672e494 3630 info->ts = ts;
7c4b4a51 3631 info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
a389d86f
SRV
3632 }
3633
6634ff26 3634 /*
a4543a2f 3635 * If this is the first commit on the page, then it has the same
b7dc42fd 3636 * timestamp as the page itself.
6634ff26 3637 */
7c4b4a51
SRV
3638 if (unlikely(!tail && !(info->add_timestamp &
3639 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
a4543a2f
SRRH
3640 info->delta = 0;
3641
b7dc42fd
SRRH
3642 /* We reserved something on the buffer */
3643
3644 event = __rb_page_index(tail_page, tail);
a4543a2f
SRRH
3645 rb_update_event(cpu_buffer, event, info);
3646
3647 local_inc(&tail_page->entries);
6634ff26 3648
b7dc42fd
SRRH
3649 /*
3650 * If this is the first commit on the page, then update
3651 * its timestamp.
3652 */
75b21c6d 3653 if (unlikely(!tail))
b7dc42fd
SRRH
3654 tail_page->page->time_stamp = info->ts;
3655
c64e148a 3656 /* account for these added bytes */
fcc742ea 3657 local_add(info->length, &cpu_buffer->entries_bytes);
c64e148a 3658
6634ff26
SR
3659 return event;
3660}
3661
fa7ffb39 3662static __always_inline struct ring_buffer_event *
13292494 3663rb_reserve_next_event(struct trace_buffer *buffer,
62f0b3eb 3664 struct ring_buffer_per_cpu *cpu_buffer,
1cd8d735 3665 unsigned long length)
7a8e76a3
SR
3666{
3667 struct ring_buffer_event *event;
fcc742ea 3668 struct rb_event_info info;
818e3dd3 3669 int nr_loops = 0;
58fbc3c6 3670 int add_ts_default;
7a8e76a3 3671
71229230
SRG
3672 /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3673 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3674 (unlikely(in_nmi()))) {
3675 return NULL;
3676 }
3677
fa743953 3678 rb_start_commit(cpu_buffer);
a389d86f 3679 /* The commit page can not change after this */
fa743953 3680
85bac32c 3681#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
62f0b3eb
SR
3682 /*
3683 * Due to the ability to swap a cpu buffer from a buffer
3684 * it is possible it was swapped before we committed.
3685 * (committing stops a swap). We check for it here and
3686 * if it happened, we have to fail the write.
3687 */
3688 barrier();
6aa7de05 3689 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
62f0b3eb
SR
3690 local_dec(&cpu_buffer->committing);
3691 local_dec(&cpu_buffer->commits);
3692 return NULL;
3693 }
85bac32c 3694#endif
b7dc42fd 3695
fcc742ea 3696 info.length = rb_calculate_event_length(length);
58fbc3c6
SRV
3697
3698 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3699 add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3700 info.length += RB_LEN_TIME_EXTEND;
139f8400 3701 if (info.length > cpu_buffer->buffer->max_data_size)
b3ae7b67 3702 goto out_fail;
58fbc3c6
SRV
3703 } else {
3704 add_ts_default = RB_ADD_STAMP_NONE;
3705 }
3706
a4543a2f 3707 again:
58fbc3c6 3708 info.add_timestamp = add_ts_default;
b7dc42fd
SRRH
3709 info.delta = 0;
3710
818e3dd3
SR
3711 /*
3712 * We allow for interrupts to reenter here and do a trace.
3713 * If one does, it will cause this original code to loop
3714 * back here. Even with heavy interrupts happening, this
3715 * should only happen a few times in a row. If this happens
3716 * 1000 times in a row, there must be either an interrupt
3717 * storm or we have something buggy.
3718 * Bail!
3719 */
3e89c7bb 3720 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
fa743953 3721 goto out_fail;
818e3dd3 3722
fcc742ea
SRRH
3723 event = __rb_reserve_next(cpu_buffer, &info);
3724
bd1b7cd3 3725 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
58fbc3c6 3726 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
bd1b7cd3 3727 info.length -= RB_LEN_TIME_EXTEND;
bf41a158 3728 goto again;
bd1b7cd3 3729 }
bf41a158 3730
a389d86f
SRV
3731 if (likely(event))
3732 return event;
fa743953
SR
3733 out_fail:
3734 rb_end_commit(cpu_buffer);
3735 return NULL;
7a8e76a3
SR
3736}
3737
3738/**
3739 * ring_buffer_lock_reserve - reserve a part of the buffer
3740 * @buffer: the ring buffer to reserve from
3741 * @length: the length of the data to reserve (excluding event header)
7a8e76a3 3742 *
6167c205 3743 * Returns a reserved event on the ring buffer to copy directly to.
7a8e76a3
SR
3744 * The user of this interface will need to get the body to write into
3745 * and can use the ring_buffer_event_data() interface.
3746 *
3747 * The length is the length of the data needed, not the event length
3748 * which also includes the event header.
3749 *
3750 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3751 * If NULL is returned, then nothing has been allocated or locked.
3752 */
3753struct ring_buffer_event *
13292494 3754ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
7a8e76a3
SR
3755{
3756 struct ring_buffer_per_cpu *cpu_buffer;
3757 struct ring_buffer_event *event;
5168ae50 3758 int cpu;
7a8e76a3 3759
bf41a158 3760 /* If we are tracing schedule, we don't want to recurse */
5168ae50 3761 preempt_disable_notrace();
bf41a158 3762
3205f806 3763 if (unlikely(atomic_read(&buffer->record_disabled)))
58a09ec6 3764 goto out;
261842b7 3765
7a8e76a3
SR
3766 cpu = raw_smp_processor_id();
3767
3205f806 3768 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
d769041f 3769 goto out;
7a8e76a3
SR
3770
3771 cpu_buffer = buffer->buffers[cpu];
7a8e76a3 3772
3205f806 3773 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
d769041f 3774 goto out;
7a8e76a3 3775
139f8400 3776 if (unlikely(length > buffer->max_data_size))
bf41a158 3777 goto out;
7a8e76a3 3778
58a09ec6
SRRH
3779 if (unlikely(trace_recursive_lock(cpu_buffer)))
3780 goto out;
3781
62f0b3eb 3782 event = rb_reserve_next_event(buffer, cpu_buffer, length);
7a8e76a3 3783 if (!event)
58a09ec6 3784 goto out_unlock;
7a8e76a3
SR
3785
3786 return event;
3787
58a09ec6
SRRH
3788 out_unlock:
3789 trace_recursive_unlock(cpu_buffer);
d769041f 3790 out:
5168ae50 3791 preempt_enable_notrace();
7a8e76a3
SR
3792 return NULL;
3793}
c4f50183 3794EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
7a8e76a3 3795
a1863c21
SR
3796/*
3797 * Decrement the entries to the page that an event is on.
3798 * The event does not even need to exist, only the pointer
3799 * to the page it is on. This may only be called before the commit
3800 * takes place.
3801 */
3802static inline void
3803rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3804 struct ring_buffer_event *event)
3805{
3806 unsigned long addr = (unsigned long)event;
3807 struct buffer_page *bpage = cpu_buffer->commit_page;
3808 struct buffer_page *start;
3809
3cb30911 3810 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
a1863c21
SR
3811
3812 /* Do the likely case first */
3813 if (likely(bpage->page == (void *)addr)) {
3814 local_dec(&bpage->entries);
3815 return;
3816 }
3817
3818 /*
3819 * Because the commit page may be on the reader page we
3820 * start with the next page and check the end loop there.
3821 */
6689bed3 3822 rb_inc_page(&bpage);
a1863c21
SR
3823 start = bpage;
3824 do {
3825 if (bpage->page == (void *)addr) {
3826 local_dec(&bpage->entries);
3827 return;
3828 }
6689bed3 3829 rb_inc_page(&bpage);
a1863c21
SR
3830 } while (bpage != start);
3831
3832 /* commit not part of this buffer?? */
3833 RB_WARN_ON(cpu_buffer, 1);
3834}
3835
fa1b47dd 3836/**
88883490 3837 * ring_buffer_discard_commit - discard an event that has not been committed
fa1b47dd
SR
3838 * @buffer: the ring buffer
3839 * @event: non committed event to discard
3840 *
dc892f73
SR
3841 * Sometimes an event that is in the ring buffer needs to be ignored.
3842 * This function lets the user discard an event in the ring buffer
3843 * and then that event will not be read later.
3844 *
6167c205 3845 * This function only works if it is called before the item has been
dc892f73 3846 * committed. It will try to free the event from the ring buffer
fa1b47dd
SR
3847 * if another event has not been added behind it.
3848 *
3849 * If another event has been added behind it, it will set the event
3850 * up as discarded, and perform the commit.
3851 *
3852 * If this function is called, do not call ring_buffer_unlock_commit on
3853 * the event.
3854 */
13292494 3855void ring_buffer_discard_commit(struct trace_buffer *buffer,
fa1b47dd
SR
3856 struct ring_buffer_event *event)
3857{
3858 struct ring_buffer_per_cpu *cpu_buffer;
fa1b47dd
SR
3859 int cpu;
3860
3861 /* The event is discarded regardless */
f3b9aae1 3862 rb_event_discard(event);
fa1b47dd 3863
fa743953
SR
3864 cpu = smp_processor_id();
3865 cpu_buffer = buffer->buffers[cpu];
3866
fa1b47dd
SR
3867 /*
3868 * This must only be called if the event has not been
3869 * committed yet. Thus we can assume that preemption
3870 * is still disabled.
3871 */
fa743953 3872 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
fa1b47dd 3873
a1863c21 3874 rb_decrement_entry(cpu_buffer, event);
0f2541d2 3875 if (rb_try_to_discard(cpu_buffer, event))
edd813bf 3876 goto out;
fa1b47dd 3877
fa1b47dd 3878 out:
fa743953 3879 rb_end_commit(cpu_buffer);
fa1b47dd 3880
58a09ec6 3881 trace_recursive_unlock(cpu_buffer);
f3b9aae1 3882
5168ae50 3883 preempt_enable_notrace();
fa1b47dd
SR
3884
3885}
3886EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3887
7a8e76a3
SR
3888/**
3889 * ring_buffer_write - write data to the buffer without reserving
3890 * @buffer: The ring buffer to write to.
3891 * @length: The length of the data being written (excluding the event header)
3892 * @data: The data to write to the buffer.
3893 *
3894 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3895 * one function. If you already have the data to write to the buffer, it
3896 * may be easier to simply call this function.
3897 *
3898 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3899 * and not the length of the event which would hold the header.
3900 */
13292494 3901int ring_buffer_write(struct trace_buffer *buffer,
01e3e710
DS
3902 unsigned long length,
3903 void *data)
7a8e76a3
SR
3904{
3905 struct ring_buffer_per_cpu *cpu_buffer;
3906 struct ring_buffer_event *event;
7a8e76a3
SR
3907 void *body;
3908 int ret = -EBUSY;
5168ae50 3909 int cpu;
7a8e76a3 3910
5168ae50 3911 preempt_disable_notrace();
bf41a158 3912
52fbe9cd
LJ
3913 if (atomic_read(&buffer->record_disabled))
3914 goto out;
3915
7a8e76a3
SR
3916 cpu = raw_smp_processor_id();
3917
9e01c1b7 3918 if (!cpumask_test_cpu(cpu, buffer->cpumask))
d769041f 3919 goto out;
7a8e76a3
SR
3920
3921 cpu_buffer = buffer->buffers[cpu];
7a8e76a3
SR
3922
3923 if (atomic_read(&cpu_buffer->record_disabled))
3924 goto out;
3925
139f8400 3926 if (length > buffer->max_data_size)
be957c44
SR
3927 goto out;
3928
985e871b
SRRH
3929 if (unlikely(trace_recursive_lock(cpu_buffer)))
3930 goto out;
3931
62f0b3eb 3932 event = rb_reserve_next_event(buffer, cpu_buffer, length);
7a8e76a3 3933 if (!event)
985e871b 3934 goto out_unlock;
7a8e76a3
SR
3935
3936 body = rb_event_data(event);
3937
3938 memcpy(body, data, length);
3939
04aabc32 3940 rb_commit(cpu_buffer);
7a8e76a3 3941
15693458
SRRH
3942 rb_wakeups(buffer, cpu_buffer);
3943
7a8e76a3 3944 ret = 0;
985e871b
SRRH
3945
3946 out_unlock:
3947 trace_recursive_unlock(cpu_buffer);
3948
7a8e76a3 3949 out:
5168ae50 3950 preempt_enable_notrace();
7a8e76a3
SR
3951
3952 return ret;
3953}
c4f50183 3954EXPORT_SYMBOL_GPL(ring_buffer_write);
7a8e76a3 3955
da58834c 3956static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
bf41a158
SR
3957{
3958 struct buffer_page *reader = cpu_buffer->reader_page;
77ae365e 3959 struct buffer_page *head = rb_set_head_page(cpu_buffer);
bf41a158
SR
3960 struct buffer_page *commit = cpu_buffer->commit_page;
3961
77ae365e
SR
3962 /* In case of error, head will be NULL */
3963 if (unlikely(!head))
da58834c 3964 return true;
77ae365e 3965
67f0d6d9 3966 /* Reader should exhaust content in reader page */
fe832be0 3967 if (reader->read != rb_page_size(reader))
67f0d6d9
HL
3968 return false;
3969
3970 /*
3971 * If writers are committing on the reader page, knowing all
3972 * committed content has been read, the ring buffer is empty.
3973 */
3974 if (commit == reader)
3975 return true;
3976
3977 /*
3978 * If writers are committing on a page other than reader page
3979 * and head page, there should always be content to read.
3980 */
3981 if (commit != head)
3982 return false;
3983
3984 /*
3985 * Writers are committing on the head page, we just need
3986 * to care about there're committed data, and the reader will
3987 * swap reader page with head page when it is to read data.
3988 */
3989 return rb_page_commit(commit) == 0;
bf41a158
SR
3990}
3991
7a8e76a3
SR
3992/**
3993 * ring_buffer_record_disable - stop all writes into the buffer
3994 * @buffer: The ring buffer to stop writes to.
3995 *
3996 * This prevents all writes to the buffer. Any attempt to write
3997 * to the buffer after this will fail and return NULL.
3998 *
74401729 3999 * The caller should call synchronize_rcu() after this.
7a8e76a3 4000 */
13292494 4001void ring_buffer_record_disable(struct trace_buffer *buffer)
7a8e76a3
SR
4002{
4003 atomic_inc(&buffer->record_disabled);
4004}
c4f50183 4005EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
7a8e76a3
SR
4006
4007/**
4008 * ring_buffer_record_enable - enable writes to the buffer
4009 * @buffer: The ring buffer to enable writes
4010 *
4011 * Note, multiple disables will need the same number of enables
c41b20e7 4012 * to truly enable the writing (much like preempt_disable).
7a8e76a3 4013 */
13292494 4014void ring_buffer_record_enable(struct trace_buffer *buffer)
7a8e76a3
SR
4015{
4016 atomic_dec(&buffer->record_disabled);
4017}
c4f50183 4018EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
7a8e76a3 4019
499e5470
SR
4020/**
4021 * ring_buffer_record_off - stop all writes into the buffer
4022 * @buffer: The ring buffer to stop writes to.
4023 *
4024 * This prevents all writes to the buffer. Any attempt to write
4025 * to the buffer after this will fail and return NULL.
4026 *
4027 * This is different than ring_buffer_record_disable() as
87abb3b1 4028 * it works like an on/off switch, where as the disable() version
499e5470
SR
4029 * must be paired with a enable().
4030 */
13292494 4031void ring_buffer_record_off(struct trace_buffer *buffer)
499e5470
SR
4032{
4033 unsigned int rd;
4034 unsigned int new_rd;
4035
8328e36d 4036 rd = atomic_read(&buffer->record_disabled);
499e5470 4037 do {
499e5470 4038 new_rd = rd | RB_BUFFER_OFF;
8328e36d 4039 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
499e5470
SR
4040}
4041EXPORT_SYMBOL_GPL(ring_buffer_record_off);
4042
4043/**
4044 * ring_buffer_record_on - restart writes into the buffer
4045 * @buffer: The ring buffer to start writes to.
4046 *
4047 * This enables all writes to the buffer that was disabled by
4048 * ring_buffer_record_off().
4049 *
4050 * This is different than ring_buffer_record_enable() as
87abb3b1 4051 * it works like an on/off switch, where as the enable() version
499e5470
SR
4052 * must be paired with a disable().
4053 */
13292494 4054void ring_buffer_record_on(struct trace_buffer *buffer)
499e5470
SR
4055{
4056 unsigned int rd;
4057 unsigned int new_rd;
4058
8328e36d 4059 rd = atomic_read(&buffer->record_disabled);
499e5470 4060 do {
499e5470 4061 new_rd = rd & ~RB_BUFFER_OFF;
8328e36d 4062 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
499e5470
SR
4063}
4064EXPORT_SYMBOL_GPL(ring_buffer_record_on);
4065
4066/**
4067 * ring_buffer_record_is_on - return true if the ring buffer can write
4068 * @buffer: The ring buffer to see if write is enabled
4069 *
4070 * Returns true if the ring buffer is in a state that it accepts writes.
4071 */
13292494 4072bool ring_buffer_record_is_on(struct trace_buffer *buffer)
499e5470
SR
4073{
4074 return !atomic_read(&buffer->record_disabled);
4075}
4076
73c8d894
MH
4077/**
4078 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
4079 * @buffer: The ring buffer to see if write is set enabled
4080 *
4081 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
4082 * Note that this does NOT mean it is in a writable state.
4083 *
4084 * It may return true when the ring buffer has been disabled by
4085 * ring_buffer_record_disable(), as that is a temporary disabling of
4086 * the ring buffer.
4087 */
13292494 4088bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
73c8d894
MH
4089{
4090 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
4091}
4092
7a8e76a3
SR
4093/**
4094 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
4095 * @buffer: The ring buffer to stop writes to.
4096 * @cpu: The CPU buffer to stop
4097 *
4098 * This prevents all writes to the buffer. Any attempt to write
4099 * to the buffer after this will fail and return NULL.
4100 *
74401729 4101 * The caller should call synchronize_rcu() after this.
7a8e76a3 4102 */
13292494 4103void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
4104{
4105 struct ring_buffer_per_cpu *cpu_buffer;
4106
9e01c1b7 4107 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 4108 return;
7a8e76a3
SR
4109
4110 cpu_buffer = buffer->buffers[cpu];
4111 atomic_inc(&cpu_buffer->record_disabled);
4112}
c4f50183 4113EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
7a8e76a3
SR
4114
4115/**
4116 * ring_buffer_record_enable_cpu - enable writes to the buffer
4117 * @buffer: The ring buffer to enable writes
4118 * @cpu: The CPU to enable.
4119 *
4120 * Note, multiple disables will need the same number of enables
c41b20e7 4121 * to truly enable the writing (much like preempt_disable).
7a8e76a3 4122 */
13292494 4123void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
4124{
4125 struct ring_buffer_per_cpu *cpu_buffer;
4126
9e01c1b7 4127 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 4128 return;
7a8e76a3
SR
4129
4130 cpu_buffer = buffer->buffers[cpu];
4131 atomic_dec(&cpu_buffer->record_disabled);
4132}
c4f50183 4133EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
7a8e76a3 4134
f6195aa0
SR
4135/*
4136 * The total entries in the ring buffer is the running counter
4137 * of entries entered into the ring buffer, minus the sum of
4138 * the entries read from the ring buffer and the number of
4139 * entries that were overwritten.
4140 */
4141static inline unsigned long
4142rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
4143{
4144 return local_read(&cpu_buffer->entries) -
4145 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
4146}
4147
c64e148a
VN
4148/**
4149 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
4150 * @buffer: The ring buffer
4151 * @cpu: The per CPU buffer to read from.
4152 */
13292494 4153u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
c64e148a
VN
4154{
4155 unsigned long flags;
4156 struct ring_buffer_per_cpu *cpu_buffer;
4157 struct buffer_page *bpage;
da830e58 4158 u64 ret = 0;
c64e148a
VN
4159
4160 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4161 return 0;
4162
4163 cpu_buffer = buffer->buffers[cpu];
7115e3fc 4164 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
c64e148a
VN
4165 /*
4166 * if the tail is on reader_page, oldest time stamp is on the reader
4167 * page
4168 */
4169 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
4170 bpage = cpu_buffer->reader_page;
4171 else
4172 bpage = rb_set_head_page(cpu_buffer);
54f7be5b
SR
4173 if (bpage)
4174 ret = bpage->page->time_stamp;
7115e3fc 4175 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
c64e148a
VN
4176
4177 return ret;
4178}
4179EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
4180
4181/**
45d99ea4 4182 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
c64e148a
VN
4183 * @buffer: The ring buffer
4184 * @cpu: The per CPU buffer to read from.
4185 */
13292494 4186unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
c64e148a
VN
4187{
4188 struct ring_buffer_per_cpu *cpu_buffer;
4189 unsigned long ret;
4190
4191 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4192 return 0;
4193
4194 cpu_buffer = buffer->buffers[cpu];
4195 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
4196
4197 return ret;
4198}
4199EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
4200
7a8e76a3
SR
4201/**
4202 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
4203 * @buffer: The ring buffer
4204 * @cpu: The per CPU buffer to get the entries from.
4205 */
13292494 4206unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
4207{
4208 struct ring_buffer_per_cpu *cpu_buffer;
4209
9e01c1b7 4210 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 4211 return 0;
7a8e76a3
SR
4212
4213 cpu_buffer = buffer->buffers[cpu];
554f786e 4214
f6195aa0 4215 return rb_num_of_entries(cpu_buffer);
7a8e76a3 4216}
c4f50183 4217EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
7a8e76a3
SR
4218
4219/**
884bfe89
SP
4220 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
4221 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
7a8e76a3
SR
4222 * @buffer: The ring buffer
4223 * @cpu: The per CPU buffer to get the number of overruns from
4224 */
13292494 4225unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
4226{
4227 struct ring_buffer_per_cpu *cpu_buffer;
8aabee57 4228 unsigned long ret;
7a8e76a3 4229
9e01c1b7 4230 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 4231 return 0;
7a8e76a3
SR
4232
4233 cpu_buffer = buffer->buffers[cpu];
77ae365e 4234 ret = local_read(&cpu_buffer->overrun);
554f786e
SR
4235
4236 return ret;
7a8e76a3 4237}
c4f50183 4238EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
7a8e76a3 4239
f0d2c681 4240/**
884bfe89
SP
4241 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4242 * commits failing due to the buffer wrapping around while there are uncommitted
4243 * events, such as during an interrupt storm.
f0d2c681
SR
4244 * @buffer: The ring buffer
4245 * @cpu: The per CPU buffer to get the number of overruns from
4246 */
4247unsigned long
13292494 4248ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
f0d2c681
SR
4249{
4250 struct ring_buffer_per_cpu *cpu_buffer;
4251 unsigned long ret;
4252
4253 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4254 return 0;
4255
4256 cpu_buffer = buffer->buffers[cpu];
77ae365e 4257 ret = local_read(&cpu_buffer->commit_overrun);
f0d2c681
SR
4258
4259 return ret;
4260}
4261EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4262
884bfe89
SP
4263/**
4264 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4265 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4266 * @buffer: The ring buffer
4267 * @cpu: The per CPU buffer to get the number of overruns from
4268 */
4269unsigned long
13292494 4270ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
884bfe89
SP
4271{
4272 struct ring_buffer_per_cpu *cpu_buffer;
4273 unsigned long ret;
4274
4275 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4276 return 0;
4277
4278 cpu_buffer = buffer->buffers[cpu];
4279 ret = local_read(&cpu_buffer->dropped_events);
4280
4281 return ret;
4282}
4283EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4284
ad964704
SRRH
4285/**
4286 * ring_buffer_read_events_cpu - get the number of events successfully read
4287 * @buffer: The ring buffer
4288 * @cpu: The per CPU buffer to get the number of events read
4289 */
4290unsigned long
13292494 4291ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
ad964704
SRRH
4292{
4293 struct ring_buffer_per_cpu *cpu_buffer;
4294
4295 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4296 return 0;
4297
4298 cpu_buffer = buffer->buffers[cpu];
4299 return cpu_buffer->read;
4300}
4301EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4302
7a8e76a3
SR
4303/**
4304 * ring_buffer_entries - get the number of entries in a buffer
4305 * @buffer: The ring buffer
4306 *
4307 * Returns the total number of entries in the ring buffer
4308 * (all CPU entries)
4309 */
13292494 4310unsigned long ring_buffer_entries(struct trace_buffer *buffer)
7a8e76a3
SR
4311{
4312 struct ring_buffer_per_cpu *cpu_buffer;
4313 unsigned long entries = 0;
4314 int cpu;
4315
4316 /* if you care about this being correct, lock the buffer */
4317 for_each_buffer_cpu(buffer, cpu) {
4318 cpu_buffer = buffer->buffers[cpu];
f6195aa0 4319 entries += rb_num_of_entries(cpu_buffer);
7a8e76a3
SR
4320 }
4321
4322 return entries;
4323}
c4f50183 4324EXPORT_SYMBOL_GPL(ring_buffer_entries);
7a8e76a3
SR
4325
4326/**
67b394f7 4327 * ring_buffer_overruns - get the number of overruns in buffer
7a8e76a3
SR
4328 * @buffer: The ring buffer
4329 *
4330 * Returns the total number of overruns in the ring buffer
4331 * (all CPU entries)
4332 */
13292494 4333unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
7a8e76a3
SR
4334{
4335 struct ring_buffer_per_cpu *cpu_buffer;
4336 unsigned long overruns = 0;
4337 int cpu;
4338
4339 /* if you care about this being correct, lock the buffer */
4340 for_each_buffer_cpu(buffer, cpu) {
4341 cpu_buffer = buffer->buffers[cpu];
77ae365e 4342 overruns += local_read(&cpu_buffer->overrun);
7a8e76a3
SR
4343 }
4344
4345 return overruns;
4346}
c4f50183 4347EXPORT_SYMBOL_GPL(ring_buffer_overruns);
7a8e76a3 4348
642edba5 4349static void rb_iter_reset(struct ring_buffer_iter *iter)
7a8e76a3
SR
4350{
4351 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4352
d769041f 4353 /* Iterator usage is expected to have record disabled */
651e22f2
SRRH
4354 iter->head_page = cpu_buffer->reader_page;
4355 iter->head = cpu_buffer->reader_page->read;
785888c5 4356 iter->next_event = iter->head;
651e22f2
SRRH
4357
4358 iter->cache_reader_page = iter->head_page;
24607f11 4359 iter->cache_read = cpu_buffer->read;
2d093282 4360 iter->cache_pages_removed = cpu_buffer->pages_removed;
651e22f2 4361
28e3fc56 4362 if (iter->head) {
d769041f 4363 iter->read_stamp = cpu_buffer->read_stamp;
28e3fc56
SRV
4364 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4365 } else {
abc9b56d 4366 iter->read_stamp = iter->head_page->page->time_stamp;
28e3fc56
SRV
4367 iter->page_stamp = iter->read_stamp;
4368 }
642edba5 4369}
f83c9d0f 4370
642edba5
SR
4371/**
4372 * ring_buffer_iter_reset - reset an iterator
4373 * @iter: The iterator to reset
4374 *
4375 * Resets the iterator, so that it will start from the beginning
4376 * again.
4377 */
4378void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4379{
554f786e 4380 struct ring_buffer_per_cpu *cpu_buffer;
642edba5
SR
4381 unsigned long flags;
4382
554f786e
SR
4383 if (!iter)
4384 return;
4385
4386 cpu_buffer = iter->cpu_buffer;
4387
5389f6fa 4388 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
642edba5 4389 rb_iter_reset(iter);
5389f6fa 4390 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
7a8e76a3 4391}
c4f50183 4392EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
7a8e76a3
SR
4393
4394/**
4395 * ring_buffer_iter_empty - check if an iterator has no more to read
4396 * @iter: The iterator to check
4397 */
4398int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4399{
4400 struct ring_buffer_per_cpu *cpu_buffer;
78f7a45d
SRV
4401 struct buffer_page *reader;
4402 struct buffer_page *head_page;
4403 struct buffer_page *commit_page;
ead6ecfd 4404 struct buffer_page *curr_commit_page;
78f7a45d 4405 unsigned commit;
ead6ecfd
SRV
4406 u64 curr_commit_ts;
4407 u64 commit_ts;
7a8e76a3
SR
4408
4409 cpu_buffer = iter->cpu_buffer;
78f7a45d
SRV
4410 reader = cpu_buffer->reader_page;
4411 head_page = cpu_buffer->head_page;
f1e30cb6 4412 commit_page = READ_ONCE(cpu_buffer->commit_page);
ead6ecfd
SRV
4413 commit_ts = commit_page->page->time_stamp;
4414
4415 /*
4416 * When the writer goes across pages, it issues a cmpxchg which
4417 * is a mb(), which will synchronize with the rmb here.
4418 * (see rb_tail_page_update())
4419 */
4420 smp_rmb();
78f7a45d 4421 commit = rb_page_commit(commit_page);
ead6ecfd
SRV
4422 /* We want to make sure that the commit page doesn't change */
4423 smp_rmb();
4424
4425 /* Make sure commit page didn't change */
4426 curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4427 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4428
4429 /* If the commit page changed, then there's more data */
4430 if (curr_commit_page != commit_page ||
4431 curr_commit_ts != commit_ts)
4432 return 0;
78f7a45d 4433
ead6ecfd 4434 /* Still racy, as it may return a false positive, but that's OK */
785888c5 4435 return ((iter->head_page == commit_page && iter->head >= commit) ||
78f7a45d
SRV
4436 (iter->head_page == reader && commit_page == head_page &&
4437 head_page->read == commit &&
fe832be0 4438 iter->head == rb_page_size(cpu_buffer->reader_page)));
7a8e76a3 4439}
c4f50183 4440EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
7a8e76a3
SR
4441
4442static void
4443rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4444 struct ring_buffer_event *event)
4445{
4446 u64 delta;
4447
334d4169 4448 switch (event->type_len) {
7a8e76a3
SR
4449 case RINGBUF_TYPE_PADDING:
4450 return;
4451
4452 case RINGBUF_TYPE_TIME_EXTEND:
e20044f7 4453 delta = rb_event_time_stamp(event);
7a8e76a3
SR
4454 cpu_buffer->read_stamp += delta;
4455 return;
4456
4457 case RINGBUF_TYPE_TIME_STAMP:
e20044f7 4458 delta = rb_event_time_stamp(event);
6695da58 4459 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp);
dc4e2801 4460 cpu_buffer->read_stamp = delta;
7a8e76a3
SR
4461 return;
4462
4463 case RINGBUF_TYPE_DATA:
4464 cpu_buffer->read_stamp += event->time_delta;
4465 return;
4466
4467 default:
da4d401a 4468 RB_WARN_ON(cpu_buffer, 1);
7a8e76a3 4469 }
7a8e76a3
SR
4470}
4471
4472static void
4473rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4474 struct ring_buffer_event *event)
4475{
4476 u64 delta;
4477
334d4169 4478 switch (event->type_len) {
7a8e76a3
SR
4479 case RINGBUF_TYPE_PADDING:
4480 return;
4481
4482 case RINGBUF_TYPE_TIME_EXTEND:
e20044f7 4483 delta = rb_event_time_stamp(event);
7a8e76a3
SR
4484 iter->read_stamp += delta;
4485 return;
4486
4487 case RINGBUF_TYPE_TIME_STAMP:
e20044f7 4488 delta = rb_event_time_stamp(event);
6695da58 4489 delta = rb_fix_abs_ts(delta, iter->read_stamp);
dc4e2801 4490 iter->read_stamp = delta;
7a8e76a3
SR
4491 return;
4492
4493 case RINGBUF_TYPE_DATA:
4494 iter->read_stamp += event->time_delta;
4495 return;
4496
4497 default:
da4d401a 4498 RB_WARN_ON(iter->cpu_buffer, 1);
7a8e76a3 4499 }
7a8e76a3
SR
4500}
4501
d769041f
SR
4502static struct buffer_page *
4503rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 4504{
d769041f 4505 struct buffer_page *reader = NULL;
139f8400 4506 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
66a8cb95 4507 unsigned long overwrite;
d769041f 4508 unsigned long flags;
818e3dd3 4509 int nr_loops = 0;
bc92b956 4510 bool ret;
d769041f 4511
3e03fb7f 4512 local_irq_save(flags);
0199c4e6 4513 arch_spin_lock(&cpu_buffer->lock);
d769041f
SR
4514
4515 again:
818e3dd3
SR
4516 /*
4517 * This should normally only loop twice. But because the
4518 * start of the reader inserts an empty page, it causes
4519 * a case where we will loop three times. There should be no
4520 * reason to loop four times (that I know of).
4521 */
3e89c7bb 4522 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
818e3dd3
SR
4523 reader = NULL;
4524 goto out;
4525 }
4526
d769041f
SR
4527 reader = cpu_buffer->reader_page;
4528
4529 /* If there's more to read, return this page */
bf41a158 4530 if (cpu_buffer->reader_page->read < rb_page_size(reader))
d769041f
SR
4531 goto out;
4532
4533 /* Never should we have an index greater than the size */
3e89c7bb
SR
4534 if (RB_WARN_ON(cpu_buffer,
4535 cpu_buffer->reader_page->read > rb_page_size(reader)))
4536 goto out;
d769041f
SR
4537
4538 /* check if we caught up to the tail */
4539 reader = NULL;
bf41a158 4540 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
d769041f 4541 goto out;
7a8e76a3 4542
a5fb8331
SR
4543 /* Don't bother swapping if the ring buffer is empty */
4544 if (rb_num_of_entries(cpu_buffer) == 0)
4545 goto out;
4546
7a8e76a3 4547 /*
d769041f 4548 * Reset the reader page to size zero.
7a8e76a3 4549 */
77ae365e
SR
4550 local_set(&cpu_buffer->reader_page->write, 0);
4551 local_set(&cpu_buffer->reader_page->entries, 0);
4552 local_set(&cpu_buffer->reader_page->page->commit, 0);
ff0ff84a 4553 cpu_buffer->reader_page->real_end = 0;
7a8e76a3 4554
77ae365e
SR
4555 spin:
4556 /*
4557 * Splice the empty reader page into the list around the head.
4558 */
4559 reader = rb_set_head_page(cpu_buffer);
54f7be5b
SR
4560 if (!reader)
4561 goto out;
0e1ff5d7 4562 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
d769041f 4563 cpu_buffer->reader_page->list.prev = reader->list.prev;
bf41a158 4564
3adc54fa
SR
4565 /*
4566 * cpu_buffer->pages just needs to point to the buffer, it
4567 * has no specific buffer page to point to. Lets move it out
25985edc 4568 * of our way so we don't accidentally swap it.
3adc54fa
SR
4569 */
4570 cpu_buffer->pages = reader->list.prev;
4571
77ae365e 4572 /* The reader page will be pointing to the new head */
6689bed3 4573 rb_set_list_to_head(&cpu_buffer->reader_page->list);
7a8e76a3 4574
66a8cb95
SR
4575 /*
4576 * We want to make sure we read the overruns after we set up our
4577 * pointers to the next object. The writer side does a
4578 * cmpxchg to cross pages which acts as the mb on the writer
4579 * side. Note, the reader will constantly fail the swap
4580 * while the writer is updating the pointers, so this
4581 * guarantees that the overwrite recorded here is the one we
4582 * want to compare with the last_overrun.
4583 */
4584 smp_mb();
4585 overwrite = local_read(&(cpu_buffer->overrun));
4586
77ae365e
SR
4587 /*
4588 * Here's the tricky part.
4589 *
4590 * We need to move the pointer past the header page.
4591 * But we can only do that if a writer is not currently
4592 * moving it. The page before the header page has the
4593 * flag bit '1' set if it is pointing to the page we want.
4594 * but if the writer is in the process of moving it
4595 * than it will be '2' or already moved '0'.
4596 */
4597
4598 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
7a8e76a3
SR
4599
4600 /*
77ae365e 4601 * If we did not convert it, then we must try again.
7a8e76a3 4602 */
77ae365e
SR
4603 if (!ret)
4604 goto spin;
7a8e76a3 4605
77ae365e 4606 /*
2c2b0a78 4607 * Yay! We succeeded in replacing the page.
77ae365e
SR
4608 *
4609 * Now make the new head point back to the reader page.
4610 */
5ded3dc6 4611 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
6689bed3 4612 rb_inc_page(&cpu_buffer->head_page);
d769041f 4613
2c2b0a78
SRV
4614 local_inc(&cpu_buffer->pages_read);
4615
d769041f
SR
4616 /* Finally update the reader page to the new head */
4617 cpu_buffer->reader_page = reader;
b81f472a 4618 cpu_buffer->reader_page->read = 0;
d769041f 4619
66a8cb95
SR
4620 if (overwrite != cpu_buffer->last_overrun) {
4621 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4622 cpu_buffer->last_overrun = overwrite;
4623 }
4624
d769041f
SR
4625 goto again;
4626
4627 out:
b81f472a
SRRH
4628 /* Update the read_stamp on the first event */
4629 if (reader && reader->read == 0)
4630 cpu_buffer->read_stamp = reader->page->time_stamp;
4631
0199c4e6 4632 arch_spin_unlock(&cpu_buffer->lock);
3e03fb7f 4633 local_irq_restore(flags);
d769041f 4634
a0fcaaed
SRG
4635 /*
4636 * The writer has preempt disable, wait for it. But not forever
4637 * Although, 1 second is pretty much "forever"
4638 */
4639#define USECS_WAIT 1000000
4640 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4641 /* If the write is past the end of page, a writer is still updating it */
139f8400 4642 if (likely(!reader || rb_page_write(reader) <= bsize))
a0fcaaed
SRG
4643 break;
4644
4645 udelay(1);
4646
4647 /* Get the latest version of the reader write value */
4648 smp_rmb();
4649 }
4650
4651 /* The writer is not moving forward? Something is wrong */
4652 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4653 reader = NULL;
4654
4655 /*
4656 * Make sure we see any padding after the write update
6455b616
ZY
4657 * (see rb_reset_tail()).
4658 *
4659 * In addition, a writer may be writing on the reader page
4660 * if the page has not been fully filled, so the read barrier
4661 * is also needed to make sure we see the content of what is
4662 * committed by the writer (see rb_set_commit_to_write()).
a0fcaaed
SRG
4663 */
4664 smp_rmb();
4665
4666
d769041f
SR
4667 return reader;
4668}
4669
4670static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4671{
4672 struct ring_buffer_event *event;
4673 struct buffer_page *reader;
4674 unsigned length;
4675
4676 reader = rb_get_reader_page(cpu_buffer);
7a8e76a3 4677
d769041f 4678 /* This function should not be called when buffer is empty */
3e89c7bb
SR
4679 if (RB_WARN_ON(cpu_buffer, !reader))
4680 return;
7a8e76a3 4681
d769041f
SR
4682 event = rb_reader_event(cpu_buffer);
4683
a1863c21 4684 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
e4906eff 4685 cpu_buffer->read++;
d769041f
SR
4686
4687 rb_update_read_stamp(cpu_buffer, event);
4688
4689 length = rb_event_length(event);
6f807acd 4690 cpu_buffer->reader_page->read += length;
45d99ea4 4691 cpu_buffer->read_bytes += length;
7a8e76a3
SR
4692}
4693
4694static void rb_advance_iter(struct ring_buffer_iter *iter)
4695{
7a8e76a3 4696 struct ring_buffer_per_cpu *cpu_buffer;
7a8e76a3
SR
4697
4698 cpu_buffer = iter->cpu_buffer;
7a8e76a3 4699
785888c5
SRV
4700 /* If head == next_event then we need to jump to the next event */
4701 if (iter->head == iter->next_event) {
4702 /* If the event gets overwritten again, there's nothing to do */
4703 if (rb_iter_head_event(iter) == NULL)
4704 return;
4705 }
4706
4707 iter->head = iter->next_event;
4708
7a8e76a3
SR
4709 /*
4710 * Check if we are at the end of the buffer.
4711 */
785888c5 4712 if (iter->next_event >= rb_page_size(iter->head_page)) {
ea05b57c
SR
4713 /* discarded commits can make the page empty */
4714 if (iter->head_page == cpu_buffer->commit_page)
3e89c7bb 4715 return;
d769041f 4716 rb_inc_iter(iter);
7a8e76a3
SR
4717 return;
4718 }
4719
785888c5 4720 rb_update_iter_read_stamp(iter, iter->event);
7a8e76a3
SR
4721}
4722
66a8cb95
SR
4723static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4724{
4725 return cpu_buffer->lost_events;
4726}
4727
f83c9d0f 4728static struct ring_buffer_event *
66a8cb95
SR
4729rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4730 unsigned long *lost_events)
7a8e76a3 4731{
7a8e76a3 4732 struct ring_buffer_event *event;
d769041f 4733 struct buffer_page *reader;
818e3dd3 4734 int nr_loops = 0;
7a8e76a3 4735
dc4e2801
TZ
4736 if (ts)
4737 *ts = 0;
7a8e76a3 4738 again:
818e3dd3 4739 /*
69d1b839
SR
4740 * We repeat when a time extend is encountered.
4741 * Since the time extend is always attached to a data event,
4742 * we should never loop more than once.
4743 * (We never hit the following condition more than twice).
818e3dd3 4744 */
69d1b839 4745 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
818e3dd3 4746 return NULL;
818e3dd3 4747
d769041f
SR
4748 reader = rb_get_reader_page(cpu_buffer);
4749 if (!reader)
7a8e76a3
SR
4750 return NULL;
4751
d769041f 4752 event = rb_reader_event(cpu_buffer);
7a8e76a3 4753
334d4169 4754 switch (event->type_len) {
7a8e76a3 4755 case RINGBUF_TYPE_PADDING:
2d622719
TZ
4756 if (rb_null_event(event))
4757 RB_WARN_ON(cpu_buffer, 1);
4758 /*
4759 * Because the writer could be discarding every
4760 * event it creates (which would probably be bad)
4761 * if we were to go back to "again" then we may never
4762 * catch up, and will trigger the warn on, or lock
4763 * the box. Return the padding, and we will release
4764 * the current locks, and try again.
4765 */
2d622719 4766 return event;
7a8e76a3
SR
4767
4768 case RINGBUF_TYPE_TIME_EXTEND:
4769 /* Internal data, OK to advance */
d769041f 4770 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
4771 goto again;
4772
4773 case RINGBUF_TYPE_TIME_STAMP:
dc4e2801 4774 if (ts) {
e20044f7 4775 *ts = rb_event_time_stamp(event);
6695da58 4776 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp);
dc4e2801
TZ
4777 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4778 cpu_buffer->cpu, ts);
4779 }
4780 /* Internal data, OK to advance */
d769041f 4781 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
4782 goto again;
4783
4784 case RINGBUF_TYPE_DATA:
dc4e2801 4785 if (ts && !(*ts)) {
7a8e76a3 4786 *ts = cpu_buffer->read_stamp + event->time_delta;
d8eeb2d3 4787 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
37886f6a 4788 cpu_buffer->cpu, ts);
7a8e76a3 4789 }
66a8cb95
SR
4790 if (lost_events)
4791 *lost_events = rb_lost_events(cpu_buffer);
7a8e76a3
SR
4792 return event;
4793
4794 default:
da4d401a 4795 RB_WARN_ON(cpu_buffer, 1);
7a8e76a3
SR
4796 }
4797
4798 return NULL;
4799}
c4f50183 4800EXPORT_SYMBOL_GPL(ring_buffer_peek);
7a8e76a3 4801
f83c9d0f
SR
4802static struct ring_buffer_event *
4803rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
7a8e76a3 4804{
13292494 4805 struct trace_buffer *buffer;
7a8e76a3
SR
4806 struct ring_buffer_per_cpu *cpu_buffer;
4807 struct ring_buffer_event *event;
818e3dd3 4808 int nr_loops = 0;
7a8e76a3 4809
dc4e2801
TZ
4810 if (ts)
4811 *ts = 0;
4812
7a8e76a3
SR
4813 cpu_buffer = iter->cpu_buffer;
4814 buffer = cpu_buffer->buffer;
4815
492a74f4 4816 /*
2d093282
ZY
4817 * Check if someone performed a consuming read to the buffer
4818 * or removed some pages from the buffer. In these cases,
4819 * iterator was invalidated and we need to reset it.
492a74f4
SR
4820 */
4821 if (unlikely(iter->cache_read != cpu_buffer->read ||
2d093282
ZY
4822 iter->cache_reader_page != cpu_buffer->reader_page ||
4823 iter->cache_pages_removed != cpu_buffer->pages_removed))
492a74f4
SR
4824 rb_iter_reset(iter);
4825
7a8e76a3 4826 again:
3c05d748
SR
4827 if (ring_buffer_iter_empty(iter))
4828 return NULL;
4829
818e3dd3 4830 /*
3d2353de
SRV
4831 * As the writer can mess with what the iterator is trying
4832 * to read, just give up if we fail to get an event after
4833 * three tries. The iterator is not as reliable when reading
4834 * the ring buffer with an active write as the consumer is.
4835 * Do not warn if the three failures is reached.
818e3dd3 4836 */
3d2353de 4837 if (++nr_loops > 3)
818e3dd3 4838 return NULL;
818e3dd3 4839
7a8e76a3
SR
4840 if (rb_per_cpu_empty(cpu_buffer))
4841 return NULL;
4842
10e83fd0 4843 if (iter->head >= rb_page_size(iter->head_page)) {
3c05d748
SR
4844 rb_inc_iter(iter);
4845 goto again;
4846 }
4847
7a8e76a3 4848 event = rb_iter_head_event(iter);
3d2353de 4849 if (!event)
785888c5 4850 goto again;
7a8e76a3 4851
334d4169 4852 switch (event->type_len) {
7a8e76a3 4853 case RINGBUF_TYPE_PADDING:
2d622719
TZ
4854 if (rb_null_event(event)) {
4855 rb_inc_iter(iter);
4856 goto again;
4857 }
4858 rb_advance_iter(iter);
4859 return event;
7a8e76a3
SR
4860
4861 case RINGBUF_TYPE_TIME_EXTEND:
4862 /* Internal data, OK to advance */
4863 rb_advance_iter(iter);
4864 goto again;
4865
4866 case RINGBUF_TYPE_TIME_STAMP:
dc4e2801 4867 if (ts) {
e20044f7 4868 *ts = rb_event_time_stamp(event);
6695da58 4869 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp);
dc4e2801
TZ
4870 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4871 cpu_buffer->cpu, ts);
4872 }
4873 /* Internal data, OK to advance */
7a8e76a3
SR
4874 rb_advance_iter(iter);
4875 goto again;
4876
4877 case RINGBUF_TYPE_DATA:
dc4e2801 4878 if (ts && !(*ts)) {
7a8e76a3 4879 *ts = iter->read_stamp + event->time_delta;
37886f6a
SR
4880 ring_buffer_normalize_time_stamp(buffer,
4881 cpu_buffer->cpu, ts);
7a8e76a3
SR
4882 }
4883 return event;
4884
4885 default:
da4d401a 4886 RB_WARN_ON(cpu_buffer, 1);
7a8e76a3
SR
4887 }
4888
4889 return NULL;
4890}
c4f50183 4891EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
7a8e76a3 4892
289a5a25 4893static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
8d707e8e 4894{
289a5a25
SRRH
4895 if (likely(!in_nmi())) {
4896 raw_spin_lock(&cpu_buffer->reader_lock);
4897 return true;
4898 }
4899
8d707e8e
SR
4900 /*
4901 * If an NMI die dumps out the content of the ring buffer
289a5a25
SRRH
4902 * trylock must be used to prevent a deadlock if the NMI
4903 * preempted a task that holds the ring buffer locks. If
4904 * we get the lock then all is fine, if not, then continue
4905 * to do the read, but this can corrupt the ring buffer,
4906 * so it must be permanently disabled from future writes.
4907 * Reading from NMI is a oneshot deal.
8d707e8e 4908 */
289a5a25
SRRH
4909 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4910 return true;
8d707e8e 4911
289a5a25
SRRH
4912 /* Continue without locking, but disable the ring buffer */
4913 atomic_inc(&cpu_buffer->record_disabled);
4914 return false;
4915}
4916
4917static inline void
4918rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4919{
4920 if (likely(locked))
4921 raw_spin_unlock(&cpu_buffer->reader_lock);
8d707e8e
SR
4922}
4923
f83c9d0f
SR
4924/**
4925 * ring_buffer_peek - peek at the next event to be read
4926 * @buffer: The ring buffer to read
4927 * @cpu: The cpu to peak at
4928 * @ts: The timestamp counter of this event.
66a8cb95 4929 * @lost_events: a variable to store if events were lost (may be NULL)
f83c9d0f
SR
4930 *
4931 * This will return the event that will be read next, but does
4932 * not consume the data.
4933 */
4934struct ring_buffer_event *
13292494 4935ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
66a8cb95 4936 unsigned long *lost_events)
f83c9d0f
SR
4937{
4938 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
8aabee57 4939 struct ring_buffer_event *event;
f83c9d0f 4940 unsigned long flags;
289a5a25 4941 bool dolock;
f83c9d0f 4942
554f786e 4943 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 4944 return NULL;
554f786e 4945
2d622719 4946 again:
8d707e8e 4947 local_irq_save(flags);
289a5a25 4948 dolock = rb_reader_lock(cpu_buffer);
66a8cb95 4949 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
469535a5
RR
4950 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4951 rb_advance_reader(cpu_buffer);
289a5a25 4952 rb_reader_unlock(cpu_buffer, dolock);
8d707e8e 4953 local_irq_restore(flags);
f83c9d0f 4954
1b959e18 4955 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2d622719 4956 goto again;
2d622719 4957
f83c9d0f
SR
4958 return event;
4959}
4960
c9b7a4a7
SRV
4961/** ring_buffer_iter_dropped - report if there are dropped events
4962 * @iter: The ring buffer iterator
4963 *
4964 * Returns true if there was dropped events since the last peek.
4965 */
4966bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
4967{
4968 bool ret = iter->missed_events != 0;
4969
4970 iter->missed_events = 0;
4971 return ret;
4972}
4973EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
4974
f83c9d0f
SR
4975/**
4976 * ring_buffer_iter_peek - peek at the next event to be read
4977 * @iter: The ring buffer iterator
4978 * @ts: The timestamp counter of this event.
4979 *
4980 * This will return the event that will be read next, but does
4981 * not increment the iterator.
4982 */
4983struct ring_buffer_event *
4984ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4985{
4986 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4987 struct ring_buffer_event *event;
4988 unsigned long flags;
4989
2d622719 4990 again:
5389f6fa 4991 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
f83c9d0f 4992 event = rb_iter_peek(iter, ts);
5389f6fa 4993 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
f83c9d0f 4994
1b959e18 4995 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2d622719 4996 goto again;
2d622719 4997
f83c9d0f
SR
4998 return event;
4999}
5000
7a8e76a3
SR
5001/**
5002 * ring_buffer_consume - return an event and consume it
5003 * @buffer: The ring buffer to get the next event from
66a8cb95
SR
5004 * @cpu: the cpu to read the buffer from
5005 * @ts: a variable to store the timestamp (may be NULL)
5006 * @lost_events: a variable to store if events were lost (may be NULL)
7a8e76a3
SR
5007 *
5008 * Returns the next event in the ring buffer, and that event is consumed.
5009 * Meaning, that sequential reads will keep returning a different event,
5010 * and eventually empty the ring buffer if the producer is slower.
5011 */
5012struct ring_buffer_event *
13292494 5013ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
66a8cb95 5014 unsigned long *lost_events)
7a8e76a3 5015{
554f786e
SR
5016 struct ring_buffer_per_cpu *cpu_buffer;
5017 struct ring_buffer_event *event = NULL;
f83c9d0f 5018 unsigned long flags;
289a5a25 5019 bool dolock;
7a8e76a3 5020
2d622719 5021 again:
554f786e
SR
5022 /* might be called in atomic */
5023 preempt_disable();
5024
9e01c1b7 5025 if (!cpumask_test_cpu(cpu, buffer->cpumask))
554f786e 5026 goto out;
7a8e76a3 5027
554f786e 5028 cpu_buffer = buffer->buffers[cpu];
8d707e8e 5029 local_irq_save(flags);
289a5a25 5030 dolock = rb_reader_lock(cpu_buffer);
f83c9d0f 5031
66a8cb95
SR
5032 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
5033 if (event) {
5034 cpu_buffer->lost_events = 0;
469535a5 5035 rb_advance_reader(cpu_buffer);
66a8cb95 5036 }
7a8e76a3 5037
289a5a25 5038 rb_reader_unlock(cpu_buffer, dolock);
8d707e8e 5039 local_irq_restore(flags);
f83c9d0f 5040
554f786e
SR
5041 out:
5042 preempt_enable();
5043
1b959e18 5044 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2d622719 5045 goto again;
2d622719 5046
7a8e76a3
SR
5047 return event;
5048}
c4f50183 5049EXPORT_SYMBOL_GPL(ring_buffer_consume);
7a8e76a3
SR
5050
5051/**
72c9ddfd 5052 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
7a8e76a3
SR
5053 * @buffer: The ring buffer to read from
5054 * @cpu: The cpu buffer to iterate over
31b265b3 5055 * @flags: gfp flags to use for memory allocation
7a8e76a3 5056 *
72c9ddfd 5057 * This performs the initial preparations necessary to iterate
ea70a962 5058 * through the buffer. Memory is allocated, buffer resizing
72c9ddfd 5059 * is disabled, and the iterator pointer is returned to the caller.
7a8e76a3 5060 *
72c9ddfd 5061 * After a sequence of ring_buffer_read_prepare calls, the user is
d611851b 5062 * expected to make at least one call to ring_buffer_read_prepare_sync.
72c9ddfd
DM
5063 * Afterwards, ring_buffer_read_start is invoked to get things going
5064 * for real.
5065 *
d611851b 5066 * This overall must be paired with ring_buffer_read_finish.
7a8e76a3
SR
5067 */
5068struct ring_buffer_iter *
13292494 5069ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
7a8e76a3
SR
5070{
5071 struct ring_buffer_per_cpu *cpu_buffer;
8aabee57 5072 struct ring_buffer_iter *iter;
7a8e76a3 5073
9e01c1b7 5074 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 5075 return NULL;
7a8e76a3 5076
785888c5 5077 iter = kzalloc(sizeof(*iter), flags);
7a8e76a3 5078 if (!iter)
8aabee57 5079 return NULL;
7a8e76a3 5080
b0495258 5081 /* Holds the entire event: data and meta data */
139f8400
TSV
5082 iter->event_size = buffer->subbuf_size;
5083 iter->event = kmalloc(iter->event_size, flags);
785888c5
SRV
5084 if (!iter->event) {
5085 kfree(iter);
5086 return NULL;
5087 }
5088
7a8e76a3
SR
5089 cpu_buffer = buffer->buffers[cpu];
5090
5091 iter->cpu_buffer = cpu_buffer;
5092
07b8b10e 5093 atomic_inc(&cpu_buffer->resize_disabled);
72c9ddfd
DM
5094
5095 return iter;
5096}
5097EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
5098
5099/**
5100 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
5101 *
5102 * All previously invoked ring_buffer_read_prepare calls to prepare
5103 * iterators will be synchronized. Afterwards, read_buffer_read_start
5104 * calls on those iterators are allowed.
5105 */
5106void
5107ring_buffer_read_prepare_sync(void)
5108{
74401729 5109 synchronize_rcu();
72c9ddfd
DM
5110}
5111EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
5112
5113/**
5114 * ring_buffer_read_start - start a non consuming read of the buffer
5115 * @iter: The iterator returned by ring_buffer_read_prepare
5116 *
5117 * This finalizes the startup of an iteration through the buffer.
5118 * The iterator comes from a call to ring_buffer_read_prepare and
5119 * an intervening ring_buffer_read_prepare_sync must have been
5120 * performed.
5121 *
d611851b 5122 * Must be paired with ring_buffer_read_finish.
72c9ddfd
DM
5123 */
5124void
5125ring_buffer_read_start(struct ring_buffer_iter *iter)
5126{
5127 struct ring_buffer_per_cpu *cpu_buffer;
5128 unsigned long flags;
5129
5130 if (!iter)
5131 return;
5132
5133 cpu_buffer = iter->cpu_buffer;
7a8e76a3 5134
5389f6fa 5135 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
0199c4e6 5136 arch_spin_lock(&cpu_buffer->lock);
642edba5 5137 rb_iter_reset(iter);
0199c4e6 5138 arch_spin_unlock(&cpu_buffer->lock);
5389f6fa 5139 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
7a8e76a3 5140}
c4f50183 5141EXPORT_SYMBOL_GPL(ring_buffer_read_start);
7a8e76a3
SR
5142
5143/**
d611851b 5144 * ring_buffer_read_finish - finish reading the iterator of the buffer
7a8e76a3
SR
5145 * @iter: The iterator retrieved by ring_buffer_start
5146 *
ea70a962 5147 * This re-enables resizing of the buffer, and frees the iterator.
7a8e76a3
SR
5148 */
5149void
5150ring_buffer_read_finish(struct ring_buffer_iter *iter)
5151{
5152 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
9366c1ba 5153 unsigned long flags;
7a8e76a3 5154
ea70a962 5155 /* Use this opportunity to check the integrity of the ring buffer. */
9366c1ba 5156 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
659f451f 5157 rb_check_pages(cpu_buffer);
9366c1ba 5158 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
659f451f 5159
07b8b10e 5160 atomic_dec(&cpu_buffer->resize_disabled);
785888c5 5161 kfree(iter->event);
7a8e76a3
SR
5162 kfree(iter);
5163}
c4f50183 5164EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
7a8e76a3
SR
5165
5166/**
bc1a72af 5167 * ring_buffer_iter_advance - advance the iterator to the next location
7a8e76a3 5168 * @iter: The ring buffer iterator
7a8e76a3 5169 *
bc1a72af
SRV
5170 * Move the location of the iterator such that the next read will
5171 * be the next location of the iterator.
7a8e76a3 5172 */
bc1a72af 5173void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
7a8e76a3 5174{
f83c9d0f
SR
5175 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5176 unsigned long flags;
7a8e76a3 5177
5389f6fa 5178 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
7e9391cf 5179
7a8e76a3
SR
5180 rb_advance_iter(iter);
5181
bc1a72af 5182 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
7a8e76a3 5183}
bc1a72af 5184EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
7a8e76a3
SR
5185
5186/**
5187 * ring_buffer_size - return the size of the ring buffer (in bytes)
5188 * @buffer: The ring buffer.
59e7cffe 5189 * @cpu: The CPU to get ring buffer size from.
7a8e76a3 5190 */
13292494 5191unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
7a8e76a3 5192{
438ced17
VN
5193 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5194 return 0;
5195
139f8400 5196 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages;
7a8e76a3 5197}
c4f50183 5198EXPORT_SYMBOL_GPL(ring_buffer_size);
7a8e76a3 5199
8ec90be7
SRG
5200/**
5201 * ring_buffer_max_event_size - return the max data size of an event
5202 * @buffer: The ring buffer.
5203 *
5204 * Returns the maximum size an event can be.
5205 */
5206unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer)
5207{
5208 /* If abs timestamp is requested, events have a timestamp too */
5209 if (ring_buffer_time_stamp_abs(buffer))
139f8400
TSV
5210 return buffer->max_data_size - RB_LEN_TIME_EXTEND;
5211 return buffer->max_data_size;
8ec90be7
SRG
5212}
5213EXPORT_SYMBOL_GPL(ring_buffer_max_event_size);
5214
7e42907f
ZY
5215static void rb_clear_buffer_page(struct buffer_page *page)
5216{
5217 local_set(&page->write, 0);
5218 local_set(&page->entries, 0);
5219 rb_init_page(page->page);
5220 page->read = 0;
5221}
5222
117c3920
VD
5223static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
5224{
5225 struct trace_buffer_meta *meta = cpu_buffer->meta_page;
5226
5227 meta->reader.read = cpu_buffer->reader_page->read;
5228 meta->reader.id = cpu_buffer->reader_page->id;
5229 meta->reader.lost_events = cpu_buffer->lost_events;
5230
5231 meta->entries = local_read(&cpu_buffer->entries);
5232 meta->overrun = local_read(&cpu_buffer->overrun);
5233 meta->read = cpu_buffer->read;
5234
5235 /* Some archs do not have data cache coherency between kernel and user-space */
5236 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page));
5237}
5238
7a8e76a3
SR
5239static void
5240rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
5241{
7e42907f
ZY
5242 struct buffer_page *page;
5243
77ae365e
SR
5244 rb_head_page_deactivate(cpu_buffer);
5245
7a8e76a3 5246 cpu_buffer->head_page
3adc54fa 5247 = list_entry(cpu_buffer->pages, struct buffer_page, list);
7e42907f
ZY
5248 rb_clear_buffer_page(cpu_buffer->head_page);
5249 list_for_each_entry(page, cpu_buffer->pages, list) {
5250 rb_clear_buffer_page(page);
5251 }
bf41a158
SR
5252
5253 cpu_buffer->tail_page = cpu_buffer->head_page;
5254 cpu_buffer->commit_page = cpu_buffer->head_page;
5255
5256 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5040b4b7 5257 INIT_LIST_HEAD(&cpu_buffer->new_pages);
7e42907f 5258 rb_clear_buffer_page(cpu_buffer->reader_page);
7a8e76a3 5259
c64e148a 5260 local_set(&cpu_buffer->entries_bytes, 0);
77ae365e 5261 local_set(&cpu_buffer->overrun, 0);
884bfe89
SP
5262 local_set(&cpu_buffer->commit_overrun, 0);
5263 local_set(&cpu_buffer->dropped_events, 0);
e4906eff 5264 local_set(&cpu_buffer->entries, 0);
fa743953
SR
5265 local_set(&cpu_buffer->committing, 0);
5266 local_set(&cpu_buffer->commits, 0);
2c2b0a78 5267 local_set(&cpu_buffer->pages_touched, 0);
31029a8b 5268 local_set(&cpu_buffer->pages_lost, 0);
2c2b0a78 5269 local_set(&cpu_buffer->pages_read, 0);
03329f99 5270 cpu_buffer->last_pages_touch = 0;
2c2b0a78 5271 cpu_buffer->shortest_full = 0;
77ae365e 5272 cpu_buffer->read = 0;
c64e148a 5273 cpu_buffer->read_bytes = 0;
69507c06 5274
10464b4a
SRV
5275 rb_time_set(&cpu_buffer->write_stamp, 0);
5276 rb_time_set(&cpu_buffer->before_stamp, 0);
77ae365e 5277
8672e494
SRV
5278 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
5279
66a8cb95
SR
5280 cpu_buffer->lost_events = 0;
5281 cpu_buffer->last_overrun = 0;
5282
117c3920
VD
5283 if (cpu_buffer->mapped)
5284 rb_update_meta_page(cpu_buffer);
5285
77ae365e 5286 rb_head_page_activate(cpu_buffer);
2d093282 5287 cpu_buffer->pages_removed = 0;
7a8e76a3
SR
5288}
5289
b23d7a5f
NP
5290/* Must have disabled the cpu buffer then done a synchronize_rcu */
5291static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5292{
5293 unsigned long flags;
5294
5295 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5296
5297 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5298 goto out;
5299
5300 arch_spin_lock(&cpu_buffer->lock);
5301
5302 rb_reset_cpu(cpu_buffer);
5303
5304 arch_spin_unlock(&cpu_buffer->lock);
5305
5306 out:
5307 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5308}
5309
7a8e76a3
SR
5310/**
5311 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5312 * @buffer: The ring buffer to reset a per cpu buffer of
5313 * @cpu: The CPU buffer to be reset
5314 */
13292494 5315void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
5316{
5317 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
7a8e76a3 5318
9e01c1b7 5319 if (!cpumask_test_cpu(cpu, buffer->cpumask))
8aabee57 5320 return;
7a8e76a3 5321
bbeb9746
GK
5322 /* prevent another thread from changing buffer sizes */
5323 mutex_lock(&buffer->mutex);
5324
07b8b10e 5325 atomic_inc(&cpu_buffer->resize_disabled);
41ede23e
SR
5326 atomic_inc(&cpu_buffer->record_disabled);
5327
83f40318 5328 /* Make sure all commits have finished */
74401729 5329 synchronize_rcu();
83f40318 5330
b23d7a5f 5331 reset_disabled_cpu_buffer(cpu_buffer);
f83c9d0f 5332
b23d7a5f
NP
5333 atomic_dec(&cpu_buffer->record_disabled);
5334 atomic_dec(&cpu_buffer->resize_disabled);
bbeb9746
GK
5335
5336 mutex_unlock(&buffer->mutex);
b23d7a5f
NP
5337}
5338EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
41b6a95d 5339
7c339fb4
TW
5340/* Flag to ensure proper resetting of atomic variables */
5341#define RESET_BIT (1 << 30)
5342
b23d7a5f 5343/**
b7085b6f 5344 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer
b23d7a5f 5345 * @buffer: The ring buffer to reset a per cpu buffer of
b23d7a5f
NP
5346 */
5347void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5348{
5349 struct ring_buffer_per_cpu *cpu_buffer;
5350 int cpu;
7a8e76a3 5351
bbeb9746
GK
5352 /* prevent another thread from changing buffer sizes */
5353 mutex_lock(&buffer->mutex);
5354
b23d7a5f
NP
5355 for_each_online_buffer_cpu(buffer, cpu) {
5356 cpu_buffer = buffer->buffers[cpu];
7a8e76a3 5357
7c339fb4 5358 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
b23d7a5f
NP
5359 atomic_inc(&cpu_buffer->record_disabled);
5360 }
f83c9d0f 5361
b23d7a5f
NP
5362 /* Make sure all commits have finished */
5363 synchronize_rcu();
41ede23e 5364
7c339fb4 5365 for_each_buffer_cpu(buffer, cpu) {
b23d7a5f
NP
5366 cpu_buffer = buffer->buffers[cpu];
5367
7c339fb4
TW
5368 /*
5369 * If a CPU came online during the synchronize_rcu(), then
5370 * ignore it.
5371 */
5372 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5373 continue;
5374
b23d7a5f
NP
5375 reset_disabled_cpu_buffer(cpu_buffer);
5376
5377 atomic_dec(&cpu_buffer->record_disabled);
7c339fb4 5378 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
b23d7a5f 5379 }
bbeb9746
GK
5380
5381 mutex_unlock(&buffer->mutex);
7a8e76a3
SR
5382}
5383
5384/**
5385 * ring_buffer_reset - reset a ring buffer
5386 * @buffer: The ring buffer to reset all cpu buffers
5387 */
13292494 5388void ring_buffer_reset(struct trace_buffer *buffer)
7a8e76a3 5389{
b23d7a5f 5390 struct ring_buffer_per_cpu *cpu_buffer;
7a8e76a3
SR
5391 int cpu;
5392
51d15794
SRV
5393 /* prevent another thread from changing buffer sizes */
5394 mutex_lock(&buffer->mutex);
5395
b23d7a5f
NP
5396 for_each_buffer_cpu(buffer, cpu) {
5397 cpu_buffer = buffer->buffers[cpu];
5398
5399 atomic_inc(&cpu_buffer->resize_disabled);
5400 atomic_inc(&cpu_buffer->record_disabled);
5401 }
5402
5403 /* Make sure all commits have finished */
5404 synchronize_rcu();
5405
5406 for_each_buffer_cpu(buffer, cpu) {
5407 cpu_buffer = buffer->buffers[cpu];
5408
5409 reset_disabled_cpu_buffer(cpu_buffer);
5410
5411 atomic_dec(&cpu_buffer->record_disabled);
5412 atomic_dec(&cpu_buffer->resize_disabled);
5413 }
51d15794
SRV
5414
5415 mutex_unlock(&buffer->mutex);
7a8e76a3 5416}
c4f50183 5417EXPORT_SYMBOL_GPL(ring_buffer_reset);
7a8e76a3
SR
5418
5419/**
b7085b6f 5420 * ring_buffer_empty - is the ring buffer empty?
7a8e76a3
SR
5421 * @buffer: The ring buffer to test
5422 */
13292494 5423bool ring_buffer_empty(struct trace_buffer *buffer)
7a8e76a3
SR
5424{
5425 struct ring_buffer_per_cpu *cpu_buffer;
d4788207 5426 unsigned long flags;
289a5a25 5427 bool dolock;
bc92b956 5428 bool ret;
7a8e76a3
SR
5429 int cpu;
5430
5431 /* yes this is racy, but if you don't like the race, lock the buffer */
5432 for_each_buffer_cpu(buffer, cpu) {
5433 cpu_buffer = buffer->buffers[cpu];
8d707e8e 5434 local_irq_save(flags);
289a5a25 5435 dolock = rb_reader_lock(cpu_buffer);
d4788207 5436 ret = rb_per_cpu_empty(cpu_buffer);
289a5a25 5437 rb_reader_unlock(cpu_buffer, dolock);
8d707e8e
SR
5438 local_irq_restore(flags);
5439
d4788207 5440 if (!ret)
3d4e204d 5441 return false;
7a8e76a3 5442 }
554f786e 5443
3d4e204d 5444 return true;
7a8e76a3 5445}
c4f50183 5446EXPORT_SYMBOL_GPL(ring_buffer_empty);
7a8e76a3
SR
5447
5448/**
5449 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5450 * @buffer: The ring buffer
5451 * @cpu: The CPU buffer to test
5452 */
13292494 5453bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
7a8e76a3
SR
5454{
5455 struct ring_buffer_per_cpu *cpu_buffer;
d4788207 5456 unsigned long flags;
289a5a25 5457 bool dolock;
bc92b956 5458 bool ret;
7a8e76a3 5459
9e01c1b7 5460 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3d4e204d 5461 return true;
7a8e76a3
SR
5462
5463 cpu_buffer = buffer->buffers[cpu];
8d707e8e 5464 local_irq_save(flags);
289a5a25 5465 dolock = rb_reader_lock(cpu_buffer);
554f786e 5466 ret = rb_per_cpu_empty(cpu_buffer);
289a5a25 5467 rb_reader_unlock(cpu_buffer, dolock);
8d707e8e 5468 local_irq_restore(flags);
554f786e
SR
5469
5470 return ret;
7a8e76a3 5471}
c4f50183 5472EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
7a8e76a3 5473
85bac32c 5474#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
7a8e76a3
SR
5475/**
5476 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5477 * @buffer_a: One buffer to swap with
5478 * @buffer_b: The other buffer to swap with
59e7cffe 5479 * @cpu: the CPU of the buffers to swap
7a8e76a3
SR
5480 *
5481 * This function is useful for tracers that want to take a "snapshot"
5482 * of a CPU buffer and has another back up buffer lying around.
5483 * it is expected that the tracer handles the cpu buffer not being
5484 * used at the moment.
5485 */
13292494
SRV
5486int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5487 struct trace_buffer *buffer_b, int cpu)
7a8e76a3
SR
5488{
5489 struct ring_buffer_per_cpu *cpu_buffer_a;
5490 struct ring_buffer_per_cpu *cpu_buffer_b;
554f786e
SR
5491 int ret = -EINVAL;
5492
9e01c1b7
RR
5493 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5494 !cpumask_test_cpu(cpu, buffer_b->cpumask))
554f786e 5495 goto out;
7a8e76a3 5496
438ced17
VN
5497 cpu_buffer_a = buffer_a->buffers[cpu];
5498 cpu_buffer_b = buffer_b->buffers[cpu];
5499
117c3920
VD
5500 /* It's up to the callers to not try to swap mapped buffers */
5501 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) {
5502 ret = -EBUSY;
5503 goto out;
5504 }
5505
7a8e76a3 5506 /* At least make sure the two buffers are somewhat the same */
438ced17 5507 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
554f786e
SR
5508 goto out;
5509
b81e03a2
SRG
5510 if (buffer_a->subbuf_order != buffer_b->subbuf_order)
5511 goto out;
5512
554f786e 5513 ret = -EAGAIN;
7a8e76a3 5514
97b17efe 5515 if (atomic_read(&buffer_a->record_disabled))
554f786e 5516 goto out;
97b17efe
SR
5517
5518 if (atomic_read(&buffer_b->record_disabled))
554f786e 5519 goto out;
97b17efe 5520
97b17efe 5521 if (atomic_read(&cpu_buffer_a->record_disabled))
554f786e 5522 goto out;
97b17efe
SR
5523
5524 if (atomic_read(&cpu_buffer_b->record_disabled))
554f786e 5525 goto out;
97b17efe 5526
7a8e76a3 5527 /*
74401729 5528 * We can't do a synchronize_rcu here because this
7a8e76a3
SR
5529 * function can be called in atomic context.
5530 * Normally this will be called from the same CPU as cpu.
5531 * If not it's up to the caller to protect this.
5532 */
5533 atomic_inc(&cpu_buffer_a->record_disabled);
5534 atomic_inc(&cpu_buffer_b->record_disabled);
5535
98277991
SR
5536 ret = -EBUSY;
5537 if (local_read(&cpu_buffer_a->committing))
5538 goto out_dec;
5539 if (local_read(&cpu_buffer_b->committing))
5540 goto out_dec;
5541
8a96c028
CL
5542 /*
5543 * When resize is in progress, we cannot swap it because
5544 * it will mess the state of the cpu buffer.
5545 */
5546 if (atomic_read(&buffer_a->resizing))
5547 goto out_dec;
5548 if (atomic_read(&buffer_b->resizing))
5549 goto out_dec;
5550
7a8e76a3
SR
5551 buffer_a->buffers[cpu] = cpu_buffer_b;
5552 buffer_b->buffers[cpu] = cpu_buffer_a;
5553
5554 cpu_buffer_b->buffer = buffer_a;
5555 cpu_buffer_a->buffer = buffer_b;
5556
98277991
SR
5557 ret = 0;
5558
5559out_dec:
7a8e76a3
SR
5560 atomic_dec(&cpu_buffer_a->record_disabled);
5561 atomic_dec(&cpu_buffer_b->record_disabled);
554f786e 5562out:
554f786e 5563 return ret;
7a8e76a3 5564}
c4f50183 5565EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
85bac32c 5566#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
7a8e76a3 5567
8789a9e7
SR
5568/**
5569 * ring_buffer_alloc_read_page - allocate a page to read from buffer
5570 * @buffer: the buffer to allocate for.
d611851b 5571 * @cpu: the cpu buffer to allocate.
8789a9e7
SR
5572 *
5573 * This function is used in conjunction with ring_buffer_read_page.
5574 * When reading a full page from the ring buffer, these functions
5575 * can be used to speed up the process. The calling function should
5576 * allocate a few pages first with this function. Then when it
5577 * needs to get pages from the ring buffer, it passes the result
5578 * of this function into ring_buffer_read_page, which will swap
5579 * the page that was allocated, with the read page of the buffer.
5580 *
5581 * Returns:
a7e52ad7 5582 * The page allocated, or ERR_PTR
8789a9e7 5583 */
bce761d7
TSV
5584struct buffer_data_read_page *
5585ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
8789a9e7 5586{
a7e52ad7 5587 struct ring_buffer_per_cpu *cpu_buffer;
bce761d7 5588 struct buffer_data_read_page *bpage = NULL;
73a757e6 5589 unsigned long flags;
7ea59064 5590 struct page *page;
8789a9e7 5591
a7e52ad7
SRV
5592 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5593 return ERR_PTR(-ENODEV);
5594
bce761d7
TSV
5595 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL);
5596 if (!bpage)
5597 return ERR_PTR(-ENOMEM);
5598
5599 bpage->order = buffer->subbuf_order;
a7e52ad7 5600 cpu_buffer = buffer->buffers[cpu];
73a757e6
SRV
5601 local_irq_save(flags);
5602 arch_spin_lock(&cpu_buffer->lock);
5603
5604 if (cpu_buffer->free_page) {
bce761d7 5605 bpage->data = cpu_buffer->free_page;
73a757e6
SRV
5606 cpu_buffer->free_page = NULL;
5607 }
5608
5609 arch_spin_unlock(&cpu_buffer->lock);
5610 local_irq_restore(flags);
5611
bce761d7 5612 if (bpage->data)
73a757e6
SRV
5613 goto out;
5614
6b76323e 5615 page = alloc_pages_node(cpu_to_node(cpu),
c09d4167 5616 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO,
f9b94daa 5617 cpu_buffer->buffer->subbuf_order);
bce761d7
TSV
5618 if (!page) {
5619 kfree(bpage);
a7e52ad7 5620 return ERR_PTR(-ENOMEM);
bce761d7 5621 }
8789a9e7 5622
bce761d7 5623 bpage->data = page_address(page);
8789a9e7 5624
73a757e6 5625 out:
bce761d7 5626 rb_init_page(bpage->data);
ef7a4a16 5627
044fa782 5628 return bpage;
8789a9e7 5629}
d6ce96da 5630EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
8789a9e7
SR
5631
5632/**
5633 * ring_buffer_free_read_page - free an allocated read page
5634 * @buffer: the buffer the page was allocate for
73a757e6 5635 * @cpu: the cpu buffer the page came from
bce761d7 5636 * @data_page: the page to free
8789a9e7
SR
5637 *
5638 * Free a page allocated from ring_buffer_alloc_read_page.
5639 */
bce761d7
TSV
5640void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu,
5641 struct buffer_data_read_page *data_page)
8789a9e7 5642{
3e4272b9 5643 struct ring_buffer_per_cpu *cpu_buffer;
bce761d7 5644 struct buffer_data_page *bpage = data_page->data;
ae415fa4 5645 struct page *page = virt_to_page(bpage);
73a757e6
SRV
5646 unsigned long flags;
5647
3e4272b9
JJB
5648 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5649 return;
5650
5651 cpu_buffer = buffer->buffers[cpu];
5652
bce761d7
TSV
5653 /*
5654 * If the page is still in use someplace else, or order of the page
5655 * is different from the subbuffer order of the buffer -
5656 * we can't reuse it
5657 */
5658 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order)
ae415fa4
SRV
5659 goto out;
5660
73a757e6
SRV
5661 local_irq_save(flags);
5662 arch_spin_lock(&cpu_buffer->lock);
5663
5664 if (!cpu_buffer->free_page) {
5665 cpu_buffer->free_page = bpage;
5666 bpage = NULL;
5667 }
5668
5669 arch_spin_unlock(&cpu_buffer->lock);
5670 local_irq_restore(flags);
5671
ae415fa4 5672 out:
bce761d7
TSV
5673 free_pages((unsigned long)bpage, data_page->order);
5674 kfree(data_page);
8789a9e7 5675}
d6ce96da 5676EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
8789a9e7
SR
5677
5678/**
5679 * ring_buffer_read_page - extract a page from the ring buffer
5680 * @buffer: buffer to extract from
5681 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
ef7a4a16 5682 * @len: amount to extract
8789a9e7
SR
5683 * @cpu: the cpu of the buffer to extract
5684 * @full: should the extraction only happen when the page is full.
5685 *
5686 * This function will pull out a page from the ring buffer and consume it.
5687 * @data_page must be the address of the variable that was returned
5688 * from ring_buffer_alloc_read_page. This is because the page might be used
5689 * to swap with a page in the ring buffer.
5690 *
5691 * for example:
d611851b 5692 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
a7e52ad7
SRV
5693 * if (IS_ERR(rpage))
5694 * return PTR_ERR(rpage);
bce761d7 5695 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0);
667d2412 5696 * if (ret >= 0)
bce761d7
TSV
5697 * process_page(ring_buffer_read_page_data(rpage), ret);
5698 * ring_buffer_free_read_page(buffer, cpu, rpage);
8789a9e7
SR
5699 *
5700 * When @full is set, the function will not return true unless
5701 * the writer is off the reader page.
5702 *
5703 * Note: it is up to the calling functions to handle sleeps and wakeups.
5704 * The ring buffer can be used anywhere in the kernel and can not
5705 * blindly call wake_up. The layer that uses the ring buffer must be
5706 * responsible for that.
5707 *
5708 * Returns:
667d2412
LJ
5709 * >=0 if data has been transferred, returns the offset of consumed data.
5710 * <0 if no data has been transferred.
8789a9e7 5711 */
13292494 5712int ring_buffer_read_page(struct trace_buffer *buffer,
bce761d7
TSV
5713 struct buffer_data_read_page *data_page,
5714 size_t len, int cpu, int full)
8789a9e7
SR
5715{
5716 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5717 struct ring_buffer_event *event;
044fa782 5718 struct buffer_data_page *bpage;
ef7a4a16 5719 struct buffer_page *reader;
ff0ff84a 5720 unsigned long missed_events;
8789a9e7 5721 unsigned long flags;
ef7a4a16 5722 unsigned int commit;
667d2412 5723 unsigned int read;
4f3640f8 5724 u64 save_timestamp;
667d2412 5725 int ret = -1;
8789a9e7 5726
554f786e
SR
5727 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5728 goto out;
5729
474d32b6
SR
5730 /*
5731 * If len is not big enough to hold the page header, then
5732 * we can not copy anything.
5733 */
5734 if (len <= BUF_PAGE_HDR_SIZE)
554f786e 5735 goto out;
474d32b6
SR
5736
5737 len -= BUF_PAGE_HDR_SIZE;
5738
bce761d7
TSV
5739 if (!data_page || !data_page->data)
5740 goto out;
5741 if (data_page->order != buffer->subbuf_order)
554f786e 5742 goto out;
8789a9e7 5743
bce761d7 5744 bpage = data_page->data;
044fa782 5745 if (!bpage)
554f786e 5746 goto out;
8789a9e7 5747
5389f6fa 5748 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
8789a9e7 5749
ef7a4a16
SR
5750 reader = rb_get_reader_page(cpu_buffer);
5751 if (!reader)
554f786e 5752 goto out_unlock;
8789a9e7 5753
ef7a4a16
SR
5754 event = rb_reader_event(cpu_buffer);
5755
5756 read = reader->read;
fe832be0 5757 commit = rb_page_size(reader);
667d2412 5758
66a8cb95 5759 /* Check if any events were dropped */
ff0ff84a 5760 missed_events = cpu_buffer->lost_events;
66a8cb95 5761
8789a9e7 5762 /*
474d32b6
SR
5763 * If this page has been partially read or
5764 * if len is not big enough to read the rest of the page or
5765 * a writer is still on the page, then
5766 * we must copy the data from the page to the buffer.
5767 * Otherwise, we can simply swap the page with the one passed in.
8789a9e7 5768 */
474d32b6 5769 if (read || (len < (commit - read)) ||
117c3920
VD
5770 cpu_buffer->reader_page == cpu_buffer->commit_page ||
5771 cpu_buffer->mapped) {
667d2412 5772 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
474d32b6
SR
5773 unsigned int rpos = read;
5774 unsigned int pos = 0;
ef7a4a16 5775 unsigned int size;
8789a9e7 5776
fa8f4a89
SRG
5777 /*
5778 * If a full page is expected, this can still be returned
5779 * if there's been a previous partial read and the
5780 * rest of the page can be read and the commit page is off
5781 * the reader page.
5782 */
5783 if (full &&
5784 (!read || (len < (commit - read)) ||
5785 cpu_buffer->reader_page == cpu_buffer->commit_page))
554f786e 5786 goto out_unlock;
8789a9e7 5787
ef7a4a16
SR
5788 if (len > (commit - read))
5789 len = (commit - read);
5790
69d1b839
SR
5791 /* Always keep the time extend and data together */
5792 size = rb_event_ts_length(event);
ef7a4a16
SR
5793
5794 if (len < size)
554f786e 5795 goto out_unlock;
ef7a4a16 5796
4f3640f8
SR
5797 /* save the current timestamp, since the user will need it */
5798 save_timestamp = cpu_buffer->read_stamp;
5799
ef7a4a16
SR
5800 /* Need to copy one event at a time */
5801 do {
e1e35927
DS
5802 /* We need the size of one event, because
5803 * rb_advance_reader only advances by one event,
5804 * whereas rb_event_ts_length may include the size of
5805 * one or two events.
5806 * We have already ensured there's enough space if this
5807 * is a time extend. */
5808 size = rb_event_length(event);
474d32b6 5809 memcpy(bpage->data + pos, rpage->data + rpos, size);
ef7a4a16
SR
5810
5811 len -= size;
5812
5813 rb_advance_reader(cpu_buffer);
474d32b6
SR
5814 rpos = reader->read;
5815 pos += size;
ef7a4a16 5816
18fab912
HY
5817 if (rpos >= commit)
5818 break;
5819
ef7a4a16 5820 event = rb_reader_event(cpu_buffer);
69d1b839
SR
5821 /* Always keep the time extend and data together */
5822 size = rb_event_ts_length(event);
e1e35927 5823 } while (len >= size);
667d2412
LJ
5824
5825 /* update bpage */
ef7a4a16 5826 local_set(&bpage->commit, pos);
4f3640f8 5827 bpage->time_stamp = save_timestamp;
ef7a4a16 5828
474d32b6
SR
5829 /* we copied everything to the beginning */
5830 read = 0;
8789a9e7 5831 } else {
afbab76a 5832 /* update the entry counter */
77ae365e 5833 cpu_buffer->read += rb_page_entries(reader);
fe832be0 5834 cpu_buffer->read_bytes += rb_page_size(reader);
afbab76a 5835
8789a9e7 5836 /* swap the pages */
044fa782 5837 rb_init_page(bpage);
ef7a4a16 5838 bpage = reader->page;
bce761d7 5839 reader->page = data_page->data;
ef7a4a16 5840 local_set(&reader->write, 0);
778c55d4 5841 local_set(&reader->entries, 0);
ef7a4a16 5842 reader->read = 0;
bce761d7 5843 data_page->data = bpage;
ff0ff84a
SR
5844
5845 /*
5846 * Use the real_end for the data size,
5847 * This gives us a chance to store the lost events
5848 * on the page.
5849 */
5850 if (reader->real_end)
5851 local_set(&bpage->commit, reader->real_end);
8789a9e7 5852 }
667d2412 5853 ret = read;
8789a9e7 5854
66a8cb95 5855 cpu_buffer->lost_events = 0;
2711ca23
SR
5856
5857 commit = local_read(&bpage->commit);
66a8cb95
SR
5858 /*
5859 * Set a flag in the commit field if we lost events
5860 */
ff0ff84a 5861 if (missed_events) {
ff0ff84a
SR
5862 /* If there is room at the end of the page to save the
5863 * missed events, then record it there.
5864 */
139f8400 5865 if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
ff0ff84a
SR
5866 memcpy(&bpage->data[commit], &missed_events,
5867 sizeof(missed_events));
5868 local_add(RB_MISSED_STORED, &bpage->commit);
2711ca23 5869 commit += sizeof(missed_events);
ff0ff84a 5870 }
66a8cb95 5871 local_add(RB_MISSED_EVENTS, &bpage->commit);
ff0ff84a 5872 }
66a8cb95 5873
2711ca23
SR
5874 /*
5875 * This page may be off to user land. Zero it out here.
5876 */
139f8400
TSV
5877 if (commit < buffer->subbuf_size)
5878 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit);
2711ca23 5879
554f786e 5880 out_unlock:
5389f6fa 5881 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
8789a9e7 5882
554f786e 5883 out:
8789a9e7
SR
5884 return ret;
5885}
d6ce96da 5886EXPORT_SYMBOL_GPL(ring_buffer_read_page);
8789a9e7 5887
bce761d7
TSV
5888/**
5889 * ring_buffer_read_page_data - get pointer to the data in the page.
5890 * @page: the page to get the data from
5891 *
5892 * Returns pointer to the actual data in this page.
5893 */
5894void *ring_buffer_read_page_data(struct buffer_data_read_page *page)
5895{
5896 return page->data;
5897}
5898EXPORT_SYMBOL_GPL(ring_buffer_read_page_data);
5899
2808e31e
TSV
5900/**
5901 * ring_buffer_subbuf_size_get - get size of the sub buffer.
5902 * @buffer: the buffer to get the sub buffer size from
5903 *
5904 * Returns size of the sub buffer, in bytes.
5905 */
5906int ring_buffer_subbuf_size_get(struct trace_buffer *buffer)
5907{
5908 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
5909}
5910EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get);
5911
5912/**
5913 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page.
5914 * @buffer: The ring_buffer to get the system sub page order from
5915 *
5916 * By default, one ring buffer sub page equals to one system page. This parameter
5917 * is configurable, per ring buffer. The size of the ring buffer sub page can be
5918 * extended, but must be an order of system page size.
5919 *
5920 * Returns the order of buffer sub page size, in system pages:
5921 * 0 means the sub buffer size is 1 system page and so forth.
5922 * In case of an error < 0 is returned.
5923 */
5924int ring_buffer_subbuf_order_get(struct trace_buffer *buffer)
5925{
5926 if (!buffer)
5927 return -EINVAL;
5928
5929 return buffer->subbuf_order;
5930}
5931EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get);
5932
5933/**
5934 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page.
5935 * @buffer: The ring_buffer to set the new page size.
5936 * @order: Order of the system pages in one sub buffer page
5937 *
5938 * By default, one ring buffer pages equals to one system page. This API can be
5939 * used to set new size of the ring buffer page. The size must be order of
5940 * system page size, that's why the input parameter @order is the order of
5941 * system pages that are allocated for one ring buffer page:
5942 * 0 - 1 system page
5943 * 1 - 2 system pages
5944 * 3 - 4 system pages
5945 * ...
5946 *
5947 * Returns 0 on success or < 0 in case of an error.
5948 */
5949int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
5950{
8e7b58c2
SRG
5951 struct ring_buffer_per_cpu *cpu_buffer;
5952 struct buffer_page *bpage, *tmp;
f9b94daa
TSV
5953 int old_order, old_size;
5954 int nr_pages;
2808e31e 5955 int psize;
f9b94daa
TSV
5956 int err;
5957 int cpu;
2808e31e
TSV
5958
5959 if (!buffer || order < 0)
5960 return -EINVAL;
5961
5962 if (buffer->subbuf_order == order)
5963 return 0;
5964
5965 psize = (1 << order) * PAGE_SIZE;
5966 if (psize <= BUF_PAGE_HDR_SIZE)
5967 return -EINVAL;
5968
e78fb4ea
SRG
5969 /* Size of a subbuf cannot be greater than the write counter */
5970 if (psize > RB_WRITE_MASK + 1)
5971 return -EINVAL;
5972
f9b94daa
TSV
5973 old_order = buffer->subbuf_order;
5974 old_size = buffer->subbuf_size;
5975
5976 /* prevent another thread from changing buffer sizes */
5977 mutex_lock(&buffer->mutex);
5978 atomic_inc(&buffer->record_disabled);
5979
5980 /* Make sure all commits have finished */
5981 synchronize_rcu();
5982
2808e31e
TSV
5983 buffer->subbuf_order = order;
5984 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE;
5985
f9b94daa
TSV
5986 /* Make sure all new buffers are allocated, before deleting the old ones */
5987 for_each_buffer_cpu(buffer, cpu) {
8e7b58c2 5988
f9b94daa
TSV
5989 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5990 continue;
5991
8e7b58c2
SRG
5992 cpu_buffer = buffer->buffers[cpu];
5993
117c3920
VD
5994 if (cpu_buffer->mapped) {
5995 err = -EBUSY;
5996 goto error;
5997 }
5998
353cc219
SRG
5999 /* Update the number of pages to match the new size */
6000 nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
6001 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
6002
8e7b58c2
SRG
6003 /* we need a minimum of two pages */
6004 if (nr_pages < 2)
6005 nr_pages = 2;
6006
6007 cpu_buffer->nr_pages_to_update = nr_pages;
6008
6009 /* Include the reader page */
6010 nr_pages++;
6011
6012 /* Allocate the new size buffer */
6013 INIT_LIST_HEAD(&cpu_buffer->new_pages);
6014 if (__rb_allocate_pages(cpu_buffer, nr_pages,
6015 &cpu_buffer->new_pages)) {
6016 /* not enough memory for new pages */
f9b94daa
TSV
6017 err = -ENOMEM;
6018 goto error;
6019 }
6020 }
6021
6022 for_each_buffer_cpu(buffer, cpu) {
8e7b58c2 6023
f9b94daa
TSV
6024 if (!cpumask_test_cpu(cpu, buffer->cpumask))
6025 continue;
6026
8e7b58c2
SRG
6027 cpu_buffer = buffer->buffers[cpu];
6028
6029 /* Clear the head bit to make the link list normal to read */
6030 rb_head_page_deactivate(cpu_buffer);
6031
6032 /* Now walk the list and free all the old sub buffers */
6033 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) {
6034 list_del_init(&bpage->list);
6035 free_buffer_page(bpage);
6036 }
6037 /* The above loop stopped an the last page needing to be freed */
6038 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list);
6039 free_buffer_page(bpage);
6040
6041 /* Free the current reader page */
6042 free_buffer_page(cpu_buffer->reader_page);
6043
6044 /* One page was allocated for the reader page */
6045 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next,
6046 struct buffer_page, list);
6047 list_del_init(&cpu_buffer->reader_page->list);
6048
6049 /* The cpu_buffer pages are a link list with no head */
6050 cpu_buffer->pages = cpu_buffer->new_pages.next;
6051 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev;
6052 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next;
6053
6054 /* Clear the new_pages list */
6055 INIT_LIST_HEAD(&cpu_buffer->new_pages);
6056
6057 cpu_buffer->head_page
6058 = list_entry(cpu_buffer->pages, struct buffer_page, list);
6059 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
6060
6061 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update;
6062 cpu_buffer->nr_pages_to_update = 0;
6063
6064 free_pages((unsigned long)cpu_buffer->free_page, old_order);
6065 cpu_buffer->free_page = NULL;
6066
6067 rb_head_page_activate(cpu_buffer);
6068
6069 rb_check_pages(cpu_buffer);
f9b94daa
TSV
6070 }
6071
6072 atomic_dec(&buffer->record_disabled);
6073 mutex_unlock(&buffer->mutex);
6074
2808e31e 6075 return 0;
f9b94daa
TSV
6076
6077error:
6078 buffer->subbuf_order = old_order;
6079 buffer->subbuf_size = old_size;
6080
6081 atomic_dec(&buffer->record_disabled);
6082 mutex_unlock(&buffer->mutex);
6083
6084 for_each_buffer_cpu(buffer, cpu) {
8e7b58c2
SRG
6085 cpu_buffer = buffer->buffers[cpu];
6086
6087 if (!cpu_buffer->nr_pages_to_update)
f9b94daa 6088 continue;
8e7b58c2
SRG
6089
6090 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) {
6091 list_del_init(&bpage->list);
6092 free_buffer_page(bpage);
6093 }
f9b94daa 6094 }
f9b94daa
TSV
6095
6096 return err;
2808e31e
TSV
6097}
6098EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
6099
117c3920
VD
6100static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
6101{
6102 struct page *page;
6103
6104 if (cpu_buffer->meta_page)
6105 return 0;
6106
6107 page = alloc_page(GFP_USER | __GFP_ZERO);
6108 if (!page)
6109 return -ENOMEM;
6110
6111 cpu_buffer->meta_page = page_to_virt(page);
6112
6113 return 0;
6114}
6115
6116static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
6117{
6118 unsigned long addr = (unsigned long)cpu_buffer->meta_page;
6119
6120 free_page(addr);
6121 cpu_buffer->meta_page = NULL;
6122}
6123
6124static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
6125 unsigned long *subbuf_ids)
6126{
6127 struct trace_buffer_meta *meta = cpu_buffer->meta_page;
6128 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
6129 struct buffer_page *first_subbuf, *subbuf;
6130 int id = 0;
6131
6132 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
6133 cpu_buffer->reader_page->id = id++;
6134
6135 first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
6136 do {
6137 if (WARN_ON(id >= nr_subbufs))
6138 break;
6139
6140 subbuf_ids[id] = (unsigned long)subbuf->page;
6141 subbuf->id = id;
6142
6143 rb_inc_page(&subbuf);
6144 id++;
6145 } while (subbuf != first_subbuf);
6146
6147 /* install subbuf ID to kern VA translation */
6148 cpu_buffer->subbuf_ids = subbuf_ids;
6149
6150 meta->meta_page_size = PAGE_SIZE;
6151 meta->meta_struct_len = sizeof(*meta);
6152 meta->nr_subbufs = nr_subbufs;
6153 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
6154
6155 rb_update_meta_page(cpu_buffer);
6156}
6157
6158static struct ring_buffer_per_cpu *
6159rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
6160{
6161 struct ring_buffer_per_cpu *cpu_buffer;
6162
6163 if (!cpumask_test_cpu(cpu, buffer->cpumask))
6164 return ERR_PTR(-EINVAL);
6165
6166 cpu_buffer = buffer->buffers[cpu];
6167
6168 mutex_lock(&cpu_buffer->mapping_lock);
6169
6170 if (!cpu_buffer->mapped) {
6171 mutex_unlock(&cpu_buffer->mapping_lock);
6172 return ERR_PTR(-ENODEV);
6173 }
6174
6175 return cpu_buffer;
6176}
6177
6178static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
6179{
6180 mutex_unlock(&cpu_buffer->mapping_lock);
6181}
6182
6183/*
6184 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need
6185 * to be set-up or torn-down.
6186 */
6187static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer,
6188 bool inc)
6189{
6190 unsigned long flags;
6191
6192 lockdep_assert_held(&cpu_buffer->mapping_lock);
6193
6194 if (inc && cpu_buffer->mapped == UINT_MAX)
6195 return -EBUSY;
6196
6197 if (WARN_ON(!inc && cpu_buffer->mapped == 0))
6198 return -EINVAL;
6199
6200 mutex_lock(&cpu_buffer->buffer->mutex);
6201 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6202
6203 if (inc)
6204 cpu_buffer->mapped++;
6205 else
6206 cpu_buffer->mapped--;
6207
6208 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6209 mutex_unlock(&cpu_buffer->buffer->mutex);
6210
6211 return 0;
6212}
6213
6214/*
6215 * +--------------+ pgoff == 0
6216 * | meta page |
6217 * +--------------+ pgoff == 1
6218 * | subbuffer 0 |
6219 * | |
6220 * +--------------+ pgoff == (1 + (1 << subbuf_order))
6221 * | subbuffer 1 |
6222 * | |
6223 * ...
6224 */
6225#ifdef CONFIG_MMU
6226static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
6227 struct vm_area_struct *vma)
6228{
6229 unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff;
6230 unsigned int subbuf_pages, subbuf_order;
6231 struct page **pages;
6232 int p = 0, s = 0;
6233 int err;
6234
6235 /* Refuse MP_PRIVATE or writable mappings */
6236 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC ||
6237 !(vma->vm_flags & VM_MAYSHARE))
6238 return -EPERM;
6239
6240 /*
6241 * Make sure the mapping cannot become writable later. Also tell the VM
6242 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND).
6243 */
6244 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP,
6245 VM_MAYWRITE);
6246
6247 lockdep_assert_held(&cpu_buffer->mapping_lock);
6248
6249 subbuf_order = cpu_buffer->buffer->subbuf_order;
6250 subbuf_pages = 1 << subbuf_order;
6251
6252 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */
6253 nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */
6254
6255 vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT;
6256 if (!vma_pages || vma_pages > nr_pages)
6257 return -EINVAL;
6258
6259 nr_pages = vma_pages;
6260
6261 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL);
6262 if (!pages)
6263 return -ENOMEM;
6264
6265 if (!pgoff) {
6266 pages[p++] = virt_to_page(cpu_buffer->meta_page);
6267
6268 /*
6269 * TODO: Align sub-buffers on their size, once
6270 * vm_insert_pages() supports the zero-page.
6271 */
6272 } else {
6273 /* Skip the meta-page */
6274 pgoff--;
6275
6276 if (pgoff % subbuf_pages) {
6277 err = -EINVAL;
6278 goto out;
6279 }
6280
6281 s += pgoff / subbuf_pages;
6282 }
6283
6284 while (p < nr_pages) {
b9c6820f 6285 struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]);
117c3920
VD
6286 int off = 0;
6287
6288 if (WARN_ON_ONCE(s >= nr_subbufs)) {
6289 err = -EINVAL;
6290 goto out;
6291 }
6292
6293 for (; off < (1 << (subbuf_order)); off++, page++) {
6294 if (p >= nr_pages)
6295 break;
6296
6297 pages[p++] = page;
6298 }
6299 s++;
6300 }
6301
6302 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages);
6303
6304out:
6305 kfree(pages);
6306
6307 return err;
6308}
6309#else
6310static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
6311 struct vm_area_struct *vma)
6312{
6313 return -EOPNOTSUPP;
6314}
6315#endif
6316
6317int ring_buffer_map(struct trace_buffer *buffer, int cpu,
6318 struct vm_area_struct *vma)
6319{
6320 struct ring_buffer_per_cpu *cpu_buffer;
6321 unsigned long flags, *subbuf_ids;
6322 int err = 0;
6323
6324 if (!cpumask_test_cpu(cpu, buffer->cpumask))
6325 return -EINVAL;
6326
6327 cpu_buffer = buffer->buffers[cpu];
6328
6329 mutex_lock(&cpu_buffer->mapping_lock);
6330
6331 if (cpu_buffer->mapped) {
6332 err = __rb_map_vma(cpu_buffer, vma);
6333 if (!err)
6334 err = __rb_inc_dec_mapped(cpu_buffer, true);
6335 mutex_unlock(&cpu_buffer->mapping_lock);
6336 return err;
6337 }
6338
6339 /* prevent another thread from changing buffer/sub-buffer sizes */
6340 mutex_lock(&buffer->mutex);
6341
6342 err = rb_alloc_meta_page(cpu_buffer);
6343 if (err)
6344 goto unlock;
6345
6346 /* subbuf_ids include the reader while nr_pages does not */
6347 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL);
6348 if (!subbuf_ids) {
6349 rb_free_meta_page(cpu_buffer);
6350 err = -ENOMEM;
6351 goto unlock;
6352 }
6353
6354 atomic_inc(&cpu_buffer->resize_disabled);
6355
6356 /*
6357 * Lock all readers to block any subbuf swap until the subbuf IDs are
6358 * assigned.
6359 */
6360 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6361 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
6362 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6363
6364 err = __rb_map_vma(cpu_buffer, vma);
6365 if (!err) {
6366 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6367 cpu_buffer->mapped = 1;
6368 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6369 } else {
6370 kfree(cpu_buffer->subbuf_ids);
6371 cpu_buffer->subbuf_ids = NULL;
6372 rb_free_meta_page(cpu_buffer);
6373 }
6374
6375unlock:
6376 mutex_unlock(&buffer->mutex);
6377 mutex_unlock(&cpu_buffer->mapping_lock);
6378
6379 return err;
6380}
6381
6382int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
6383{
6384 struct ring_buffer_per_cpu *cpu_buffer;
6385 unsigned long flags;
6386 int err = 0;
6387
6388 if (!cpumask_test_cpu(cpu, buffer->cpumask))
6389 return -EINVAL;
6390
6391 cpu_buffer = buffer->buffers[cpu];
6392
6393 mutex_lock(&cpu_buffer->mapping_lock);
6394
6395 if (!cpu_buffer->mapped) {
6396 err = -ENODEV;
6397 goto out;
6398 } else if (cpu_buffer->mapped > 1) {
6399 __rb_inc_dec_mapped(cpu_buffer, false);
6400 goto out;
6401 }
6402
6403 mutex_lock(&buffer->mutex);
6404 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6405
6406 cpu_buffer->mapped = 0;
6407
6408 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6409
6410 kfree(cpu_buffer->subbuf_ids);
6411 cpu_buffer->subbuf_ids = NULL;
6412 rb_free_meta_page(cpu_buffer);
6413 atomic_dec(&cpu_buffer->resize_disabled);
6414
6415 mutex_unlock(&buffer->mutex);
6416
6417out:
6418 mutex_unlock(&cpu_buffer->mapping_lock);
6419
6420 return err;
6421}
6422
6423int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
6424{
6425 struct ring_buffer_per_cpu *cpu_buffer;
fe832be0
SRG
6426 struct buffer_page *reader;
6427 unsigned long missed_events;
117c3920
VD
6428 unsigned long reader_size;
6429 unsigned long flags;
6430
6431 cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
6432 if (IS_ERR(cpu_buffer))
6433 return (int)PTR_ERR(cpu_buffer);
6434
6435 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6436
6437consume:
6438 if (rb_per_cpu_empty(cpu_buffer))
6439 goto out;
6440
6441 reader_size = rb_page_size(cpu_buffer->reader_page);
6442
6443 /*
6444 * There are data to be read on the current reader page, we can
6445 * return to the caller. But before that, we assume the latter will read
6446 * everything. Let's update the kernel reader accordingly.
6447 */
6448 if (cpu_buffer->reader_page->read < reader_size) {
6449 while (cpu_buffer->reader_page->read < reader_size)
6450 rb_advance_reader(cpu_buffer);
6451 goto out;
6452 }
6453
fe832be0
SRG
6454 reader = rb_get_reader_page(cpu_buffer);
6455 if (WARN_ON(!reader))
117c3920
VD
6456 goto out;
6457
fe832be0
SRG
6458 /* Check if any events were dropped */
6459 missed_events = cpu_buffer->lost_events;
6460
6461 if (cpu_buffer->reader_page != cpu_buffer->commit_page) {
6462 if (missed_events) {
6463 struct buffer_data_page *bpage = reader->page;
6464 unsigned int commit;
6465 /*
6466 * Use the real_end for the data size,
6467 * This gives us a chance to store the lost events
6468 * on the page.
6469 */
6470 if (reader->real_end)
6471 local_set(&bpage->commit, reader->real_end);
6472 /*
6473 * If there is room at the end of the page to save the
6474 * missed events, then record it there.
6475 */
6476 commit = rb_page_size(reader);
6477 if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
6478 memcpy(&bpage->data[commit], &missed_events,
6479 sizeof(missed_events));
6480 local_add(RB_MISSED_STORED, &bpage->commit);
6481 }
6482 local_add(RB_MISSED_EVENTS, &bpage->commit);
6483 }
6484 } else {
6485 /*
6486 * There really shouldn't be any missed events if the commit
6487 * is on the reader page.
6488 */
6489 WARN_ON_ONCE(missed_events);
6490 }
6491
6492 cpu_buffer->lost_events = 0;
6493
117c3920
VD
6494 goto consume;
6495
6496out:
6497 /* Some archs do not have data cache coherency between kernel and user-space */
6498 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page));
6499
6500 rb_update_meta_page(cpu_buffer);
6501
6502 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6503 rb_put_mapped_buffer(cpu_buffer);
6504
6505 return 0;
6506}
6507
b32614c0
SAS
6508/*
6509 * We only allocate new buffers, never free them if the CPU goes down.
6510 * If we were to free the buffer, then the user would lose any trace that was in
6511 * the buffer.
6512 */
6513int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
554f786e 6514{
13292494 6515 struct trace_buffer *buffer;
9b94a8fb
SRRH
6516 long nr_pages_same;
6517 int cpu_i;
6518 unsigned long nr_pages;
554f786e 6519
13292494 6520 buffer = container_of(node, struct trace_buffer, node);
b32614c0
SAS
6521 if (cpumask_test_cpu(cpu, buffer->cpumask))
6522 return 0;
6523
6524 nr_pages = 0;
6525 nr_pages_same = 1;
6526 /* check if all cpu sizes are same */
6527 for_each_buffer_cpu(buffer, cpu_i) {
6528 /* fill in the size from first enabled cpu */
6529 if (nr_pages == 0)
6530 nr_pages = buffer->buffers[cpu_i]->nr_pages;
6531 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
6532 nr_pages_same = 0;
6533 break;
554f786e 6534 }
554f786e 6535 }
b32614c0
SAS
6536 /* allocate minimum pages, user can later expand it */
6537 if (!nr_pages_same)
6538 nr_pages = 2;
6539 buffer->buffers[cpu] =
6540 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
6541 if (!buffer->buffers[cpu]) {
6542 WARN(1, "failed to allocate ring buffer on CPU %u\n",
6543 cpu);
6544 return -ENOMEM;
6545 }
6546 smp_wmb();
6547 cpumask_set_cpu(cpu, buffer->cpumask);
6548 return 0;
554f786e 6549}
6c43e554
SRRH
6550
6551#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
6552/*
6553 * This is a basic integrity check of the ring buffer.
6554 * Late in the boot cycle this test will run when configured in.
6555 * It will kick off a thread per CPU that will go into a loop
6556 * writing to the per cpu ring buffer various sizes of data.
6557 * Some of the data will be large items, some small.
6558 *
6559 * Another thread is created that goes into a spin, sending out
6560 * IPIs to the other CPUs to also write into the ring buffer.
6561 * this is to test the nesting ability of the buffer.
6562 *
6563 * Basic stats are recorded and reported. If something in the
6564 * ring buffer should happen that's not expected, a big warning
6565 * is displayed and all ring buffers are disabled.
6566 */
6567static struct task_struct *rb_threads[NR_CPUS] __initdata;
6568
6569struct rb_test_data {
13292494 6570 struct trace_buffer *buffer;
6c43e554
SRRH
6571 unsigned long events;
6572 unsigned long bytes_written;
6573 unsigned long bytes_alloc;
6574 unsigned long bytes_dropped;
6575 unsigned long events_nested;
6576 unsigned long bytes_written_nested;
6577 unsigned long bytes_alloc_nested;
6578 unsigned long bytes_dropped_nested;
6579 int min_size_nested;
6580 int max_size_nested;
6581 int max_size;
6582 int min_size;
6583 int cpu;
6584 int cnt;
6585};
6586
6587static struct rb_test_data rb_data[NR_CPUS] __initdata;
6588
6589/* 1 meg per cpu */
6590#define RB_TEST_BUFFER_SIZE 1048576
6591
6592static char rb_string[] __initdata =
6593 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
6594 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
6595 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
6596
6597static bool rb_test_started __initdata;
6598
6599struct rb_item {
6600 int size;
6601 char str[];
6602};
6603
6604static __init int rb_write_something(struct rb_test_data *data, bool nested)
6605{
6606 struct ring_buffer_event *event;
6607 struct rb_item *item;
6608 bool started;
6609 int event_len;
6610 int size;
6611 int len;
6612 int cnt;
6613
6614 /* Have nested writes different that what is written */
6615 cnt = data->cnt + (nested ? 27 : 0);
6616
6617 /* Multiply cnt by ~e, to make some unique increment */
40ed29b3 6618 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
6c43e554
SRRH
6619
6620 len = size + sizeof(struct rb_item);
6621
6622 started = rb_test_started;
6623 /* read rb_test_started before checking buffer enabled */
6624 smp_rmb();
6625
6626 event = ring_buffer_lock_reserve(data->buffer, len);
6627 if (!event) {
6628 /* Ignore dropped events before test starts. */
6629 if (started) {
6630 if (nested)
6631 data->bytes_dropped += len;
6632 else
6633 data->bytes_dropped_nested += len;
6634 }
6635 return len;
6636 }
6637
6638 event_len = ring_buffer_event_length(event);
6639
6640 if (RB_WARN_ON(data->buffer, event_len < len))
6641 goto out;
6642
6643 item = ring_buffer_event_data(event);
6644 item->size = size;
6645 memcpy(item->str, rb_string, size);
6646
6647 if (nested) {
6648 data->bytes_alloc_nested += event_len;
6649 data->bytes_written_nested += len;
6650 data->events_nested++;
6651 if (!data->min_size_nested || len < data->min_size_nested)
6652 data->min_size_nested = len;
6653 if (len > data->max_size_nested)
6654 data->max_size_nested = len;
6655 } else {
6656 data->bytes_alloc += event_len;
6657 data->bytes_written += len;
6658 data->events++;
6659 if (!data->min_size || len < data->min_size)
6660 data->max_size = len;
6661 if (len > data->max_size)
6662 data->max_size = len;
6663 }
6664
6665 out:
04aabc32 6666 ring_buffer_unlock_commit(data->buffer);
6c43e554
SRRH
6667
6668 return 0;
6669}
6670
6671static __init int rb_test(void *arg)
6672{
6673 struct rb_test_data *data = arg;
6674
6675 while (!kthread_should_stop()) {
6676 rb_write_something(data, false);
6677 data->cnt++;
6678
6679 set_current_state(TASK_INTERRUPTIBLE);
6680 /* Now sleep between a min of 100-300us and a max of 1ms */
6681 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
6682 }
6683
6684 return 0;
6685}
6686
6687static __init void rb_ipi(void *ignore)
6688{
6689 struct rb_test_data *data;
6690 int cpu = smp_processor_id();
6691
6692 data = &rb_data[cpu];
6693 rb_write_something(data, true);
6694}
6695
6696static __init int rb_hammer_test(void *arg)
6697{
6698 while (!kthread_should_stop()) {
6699
6700 /* Send an IPI to all cpus to write data! */
6701 smp_call_function(rb_ipi, NULL, 1);
6702 /* No sleep, but for non preempt, let others run */
6703 schedule();
6704 }
6705
6706 return 0;
6707}
6708
6709static __init int test_ringbuffer(void)
6710{
6711 struct task_struct *rb_hammer;
13292494 6712 struct trace_buffer *buffer;
6c43e554
SRRH
6713 int cpu;
6714 int ret = 0;
6715
a356646a 6716 if (security_locked_down(LOCKDOWN_TRACEFS)) {
ee195452 6717 pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
a356646a
SRV
6718 return 0;
6719 }
6720
6c43e554
SRRH
6721 pr_info("Running ring buffer tests...\n");
6722
6723 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
6724 if (WARN_ON(!buffer))
6725 return 0;
6726
6727 /* Disable buffer so that threads can't write to it yet */
6728 ring_buffer_record_off(buffer);
6729
6730 for_each_online_cpu(cpu) {
6731 rb_data[cpu].buffer = buffer;
6732 rb_data[cpu].cpu = cpu;
6733 rb_data[cpu].cnt = cpu;
64ed3a04
CH
6734 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu],
6735 cpu, "rbtester/%u");
62277de7 6736 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
6c43e554 6737 pr_cont("FAILED\n");
62277de7 6738 ret = PTR_ERR(rb_threads[cpu]);
6c43e554
SRRH
6739 goto out_free;
6740 }
6c43e554
SRRH
6741 }
6742
6743 /* Now create the rb hammer! */
6744 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
62277de7 6745 if (WARN_ON(IS_ERR(rb_hammer))) {
6c43e554 6746 pr_cont("FAILED\n");
62277de7 6747 ret = PTR_ERR(rb_hammer);
6c43e554
SRRH
6748 goto out_free;
6749 }
6750
6751 ring_buffer_record_on(buffer);
6752 /*
6753 * Show buffer is enabled before setting rb_test_started.
6754 * Yes there's a small race window where events could be
6755 * dropped and the thread wont catch it. But when a ring
6756 * buffer gets enabled, there will always be some kind of
6757 * delay before other CPUs see it. Thus, we don't care about
6758 * those dropped events. We care about events dropped after
6759 * the threads see that the buffer is active.
6760 */
6761 smp_wmb();
6762 rb_test_started = true;
6763
6764 set_current_state(TASK_INTERRUPTIBLE);
6765 /* Just run for 10 seconds */;
6766 schedule_timeout(10 * HZ);
6767
6768 kthread_stop(rb_hammer);
6769
6770 out_free:
6771 for_each_online_cpu(cpu) {
6772 if (!rb_threads[cpu])
6773 break;
6774 kthread_stop(rb_threads[cpu]);
6775 }
6776 if (ret) {
6777 ring_buffer_free(buffer);
6778 return ret;
6779 }
6780
6781 /* Report! */
6782 pr_info("finished\n");
6783 for_each_online_cpu(cpu) {
6784 struct ring_buffer_event *event;
6785 struct rb_test_data *data = &rb_data[cpu];
6786 struct rb_item *item;
6787 unsigned long total_events;
6788 unsigned long total_dropped;
6789 unsigned long total_written;
6790 unsigned long total_alloc;
6791 unsigned long total_read = 0;
6792 unsigned long total_size = 0;
6793 unsigned long total_len = 0;
6794 unsigned long total_lost = 0;
6795 unsigned long lost;
6796 int big_event_size;
6797 int small_event_size;
6798
6799 ret = -1;
6800
6801 total_events = data->events + data->events_nested;
6802 total_written = data->bytes_written + data->bytes_written_nested;
6803 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
6804 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
6805
6806 big_event_size = data->max_size + data->max_size_nested;
6807 small_event_size = data->min_size + data->min_size_nested;
6808
6809 pr_info("CPU %d:\n", cpu);
6810 pr_info(" events: %ld\n", total_events);
6811 pr_info(" dropped bytes: %ld\n", total_dropped);
6812 pr_info(" alloced bytes: %ld\n", total_alloc);
6813 pr_info(" written bytes: %ld\n", total_written);
6814 pr_info(" biggest event: %d\n", big_event_size);
6815 pr_info(" smallest event: %d\n", small_event_size);
6816
6817 if (RB_WARN_ON(buffer, total_dropped))
6818 break;
6819
6820 ret = 0;
6821
6822 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
6823 total_lost += lost;
6824 item = ring_buffer_event_data(event);
6825 total_len += ring_buffer_event_length(event);
6826 total_size += item->size + sizeof(struct rb_item);
6827 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
6828 pr_info("FAILED!\n");
6829 pr_info("buffer had: %.*s\n", item->size, item->str);
6830 pr_info("expected: %.*s\n", item->size, rb_string);
6831 RB_WARN_ON(buffer, 1);
6832 ret = -1;
6833 break;
6834 }
6835 total_read++;
6836 }
6837 if (ret)
6838 break;
6839
6840 ret = -1;
6841
6842 pr_info(" read events: %ld\n", total_read);
6843 pr_info(" lost events: %ld\n", total_lost);
6844 pr_info(" total events: %ld\n", total_lost + total_read);
6845 pr_info(" recorded len bytes: %ld\n", total_len);
6846 pr_info(" recorded size bytes: %ld\n", total_size);
ed888241 6847 if (total_lost) {
6c43e554
SRRH
6848 pr_info(" With dropped events, record len and size may not match\n"
6849 " alloced and written from above\n");
ed888241 6850 } else {
6c43e554
SRRH
6851 if (RB_WARN_ON(buffer, total_len != total_alloc ||
6852 total_size != total_written))
6853 break;
6854 }
6855 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
6856 break;
6857
6858 ret = 0;
6859 }
6860 if (!ret)
6861 pr_info("Ring buffer PASSED!\n");
6862
6863 ring_buffer_free(buffer);
6864 return 0;
6865}
6866
6867late_initcall(test_ringbuffer);
6868#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */