4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h> /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
23 u64 ring_buffer_time_stamp(int cpu)
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
31 /* Just stupid testing the normalize function and deltas */
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT 2
37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA 28
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
62 case RINGBUF_TYPE_DATA:
64 length = event->len << RB_ALIGNMENT_SHIFT;
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
81 return rb_event_length(event);
84 /* inline for ring buffer fast paths */
86 rb_event_data(struct ring_buffer_event *event)
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
102 return rb_event_data(event);
105 #define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
109 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST (~TS_MASK)
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
118 u64 time_stamp; /* page time stamp */
119 unsigned size; /* size of page data */
120 unsigned write; /* index for next write */
121 unsigned read; /* index for next read */
122 struct list_head list; /* list of free pages */
123 void *page; /* Actual data page */
127 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
130 static inline void free_buffer_page(struct buffer_page *bpage)
133 __free_page(bpage->page);
138 * We need to fit the time_stamp delta into 27 bits.
140 static inline int test_time_stamp(u64 delta)
142 if (delta & TS_DELTA_TEST)
147 #define BUF_PAGE_SIZE PAGE_SIZE
150 * head_page == tail_page && head == tail then buffer is empty.
152 struct ring_buffer_per_cpu {
154 struct ring_buffer *buffer;
156 struct lock_class_key lock_key;
157 struct list_head pages;
158 struct buffer_page *head_page; /* read from head */
159 struct buffer_page *tail_page; /* write to tail */
160 struct buffer_page *reader_page;
161 unsigned long overrun;
162 unsigned long entries;
165 atomic_t record_disabled;
174 atomic_t record_disabled;
178 struct ring_buffer_per_cpu **buffers;
181 struct ring_buffer_iter {
182 struct ring_buffer_per_cpu *cpu_buffer;
184 struct buffer_page *head_page;
188 #define RB_WARN_ON(buffer, cond) \
189 if (unlikely(cond)) { \
190 atomic_inc(&buffer->record_disabled); \
196 * check_pages - integrity check of buffer pages
197 * @cpu_buffer: CPU buffer with pages to test
199 * As a safty measure we check to make sure the data pages have not
202 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
204 struct list_head *head = &cpu_buffer->pages;
205 struct buffer_page *page, *tmp;
207 RB_WARN_ON(cpu_buffer, head->next->prev != head);
208 RB_WARN_ON(cpu_buffer, head->prev->next != head);
210 list_for_each_entry_safe(page, tmp, head, list) {
211 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
212 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
218 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
220 return cpu_buffer->head_page->size;
223 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
226 struct list_head *head = &cpu_buffer->pages;
227 struct buffer_page *page, *tmp;
232 for (i = 0; i < nr_pages; i++) {
233 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
234 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
237 list_add(&page->list, &pages);
239 addr = __get_free_page(GFP_KERNEL);
242 page->page = (void *)addr;
245 list_splice(&pages, head);
247 rb_check_pages(cpu_buffer);
252 list_for_each_entry_safe(page, tmp, &pages, list) {
253 list_del_init(&page->list);
254 free_buffer_page(page);
259 static struct ring_buffer_per_cpu *
260 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
262 struct ring_buffer_per_cpu *cpu_buffer;
263 struct buffer_page *page;
267 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
268 GFP_KERNEL, cpu_to_node(cpu));
272 cpu_buffer->cpu = cpu;
273 cpu_buffer->buffer = buffer;
274 spin_lock_init(&cpu_buffer->lock);
275 INIT_LIST_HEAD(&cpu_buffer->pages);
277 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
278 GFP_KERNEL, cpu_to_node(cpu));
280 goto fail_free_buffer;
282 cpu_buffer->reader_page = page;
283 addr = __get_free_page(GFP_KERNEL);
285 goto fail_free_reader;
286 page->page = (void *)addr;
288 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
289 cpu_buffer->reader_page->size = 0;
291 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
293 goto fail_free_reader;
295 cpu_buffer->head_page
296 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
297 cpu_buffer->tail_page
298 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
303 free_buffer_page(cpu_buffer->reader_page);
310 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
312 struct list_head *head = &cpu_buffer->pages;
313 struct buffer_page *page, *tmp;
315 list_del_init(&cpu_buffer->reader_page->list);
316 free_buffer_page(cpu_buffer->reader_page);
318 list_for_each_entry_safe(page, tmp, head, list) {
319 list_del_init(&page->list);
320 free_buffer_page(page);
326 * Causes compile errors if the struct buffer_page gets bigger
327 * than the struct page.
329 extern int ring_buffer_page_too_big(void);
332 * ring_buffer_alloc - allocate a new ring_buffer
333 * @size: the size in bytes that is needed.
334 * @flags: attributes to set for the ring buffer.
336 * Currently the only flag that is available is the RB_FL_OVERWRITE
337 * flag. This flag means that the buffer will overwrite old data
338 * when the buffer wraps. If this flag is not set, the buffer will
339 * drop data when the tail hits the head.
341 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
343 struct ring_buffer *buffer;
347 /* Paranoid! Optimizes out when all is well */
348 if (sizeof(struct buffer_page) > sizeof(struct page))
349 ring_buffer_page_too_big();
352 /* keep it in its own cache line */
353 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
358 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
359 buffer->flags = flags;
361 /* need at least two pages */
362 if (buffer->pages == 1)
365 buffer->cpumask = cpu_possible_map;
366 buffer->cpus = nr_cpu_ids;
368 bsize = sizeof(void *) * nr_cpu_ids;
369 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
371 if (!buffer->buffers)
372 goto fail_free_buffer;
374 for_each_buffer_cpu(buffer, cpu) {
375 buffer->buffers[cpu] =
376 rb_allocate_cpu_buffer(buffer, cpu);
377 if (!buffer->buffers[cpu])
378 goto fail_free_buffers;
381 mutex_init(&buffer->mutex);
386 for_each_buffer_cpu(buffer, cpu) {
387 if (buffer->buffers[cpu])
388 rb_free_cpu_buffer(buffer->buffers[cpu]);
390 kfree(buffer->buffers);
398 * ring_buffer_free - free a ring buffer.
399 * @buffer: the buffer to free.
402 ring_buffer_free(struct ring_buffer *buffer)
406 for_each_buffer_cpu(buffer, cpu)
407 rb_free_cpu_buffer(buffer->buffers[cpu]);
412 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
415 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
417 struct buffer_page *page;
421 atomic_inc(&cpu_buffer->record_disabled);
424 for (i = 0; i < nr_pages; i++) {
425 BUG_ON(list_empty(&cpu_buffer->pages));
426 p = cpu_buffer->pages.next;
427 page = list_entry(p, struct buffer_page, list);
428 list_del_init(&page->list);
429 free_buffer_page(page);
431 BUG_ON(list_empty(&cpu_buffer->pages));
433 rb_reset_cpu(cpu_buffer);
435 rb_check_pages(cpu_buffer);
437 atomic_dec(&cpu_buffer->record_disabled);
442 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
443 struct list_head *pages, unsigned nr_pages)
445 struct buffer_page *page;
449 atomic_inc(&cpu_buffer->record_disabled);
452 for (i = 0; i < nr_pages; i++) {
453 BUG_ON(list_empty(pages));
455 page = list_entry(p, struct buffer_page, list);
456 list_del_init(&page->list);
457 list_add_tail(&page->list, &cpu_buffer->pages);
459 rb_reset_cpu(cpu_buffer);
461 rb_check_pages(cpu_buffer);
463 atomic_dec(&cpu_buffer->record_disabled);
467 * ring_buffer_resize - resize the ring buffer
468 * @buffer: the buffer to resize.
469 * @size: the new size.
471 * The tracer is responsible for making sure that the buffer is
472 * not being used while changing the size.
473 * Note: We may be able to change the above requirement by using
474 * RCU synchronizations.
476 * Minimum size is 2 * BUF_PAGE_SIZE.
478 * Returns -1 on failure.
480 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
482 struct ring_buffer_per_cpu *cpu_buffer;
483 unsigned nr_pages, rm_pages, new_pages;
484 struct buffer_page *page, *tmp;
485 unsigned long buffer_size;
490 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
491 size *= BUF_PAGE_SIZE;
492 buffer_size = buffer->pages * BUF_PAGE_SIZE;
494 /* we need a minimum of two pages */
495 if (size < BUF_PAGE_SIZE * 2)
496 size = BUF_PAGE_SIZE * 2;
498 if (size == buffer_size)
501 mutex_lock(&buffer->mutex);
503 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
505 if (size < buffer_size) {
507 /* easy case, just free pages */
508 BUG_ON(nr_pages >= buffer->pages);
510 rm_pages = buffer->pages - nr_pages;
512 for_each_buffer_cpu(buffer, cpu) {
513 cpu_buffer = buffer->buffers[cpu];
514 rb_remove_pages(cpu_buffer, rm_pages);
520 * This is a bit more difficult. We only want to add pages
521 * when we can allocate enough for all CPUs. We do this
522 * by allocating all the pages and storing them on a local
523 * link list. If we succeed in our allocation, then we
524 * add these pages to the cpu_buffers. Otherwise we just free
525 * them all and return -ENOMEM;
527 BUG_ON(nr_pages <= buffer->pages);
528 new_pages = nr_pages - buffer->pages;
530 for_each_buffer_cpu(buffer, cpu) {
531 for (i = 0; i < new_pages; i++) {
532 page = kzalloc_node(ALIGN(sizeof(*page),
534 GFP_KERNEL, cpu_to_node(cpu));
537 list_add(&page->list, &pages);
538 addr = __get_free_page(GFP_KERNEL);
541 page->page = (void *)addr;
545 for_each_buffer_cpu(buffer, cpu) {
546 cpu_buffer = buffer->buffers[cpu];
547 rb_insert_pages(cpu_buffer, &pages, new_pages);
550 BUG_ON(!list_empty(&pages));
553 buffer->pages = nr_pages;
554 mutex_unlock(&buffer->mutex);
559 list_for_each_entry_safe(page, tmp, &pages, list) {
560 list_del_init(&page->list);
561 free_buffer_page(page);
566 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
568 return cpu_buffer->reader_page->read == cpu_buffer->reader_page->size &&
569 (cpu_buffer->tail_page == cpu_buffer->reader_page ||
570 (cpu_buffer->tail_page == cpu_buffer->head_page &&
571 cpu_buffer->head_page->read ==
572 cpu_buffer->tail_page->write));
575 static inline int rb_null_event(struct ring_buffer_event *event)
577 return event->type == RINGBUF_TYPE_PADDING;
580 static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
582 return page->page + index;
585 static inline struct ring_buffer_event *
586 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
588 return __rb_page_index(cpu_buffer->reader_page,
589 cpu_buffer->reader_page->read);
592 static inline struct ring_buffer_event *
593 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
595 return __rb_page_index(cpu_buffer->head_page,
596 cpu_buffer->head_page->read);
599 static inline struct ring_buffer_event *
600 rb_iter_head_event(struct ring_buffer_iter *iter)
602 return __rb_page_index(iter->head_page, iter->head);
606 * When the tail hits the head and the buffer is in overwrite mode,
607 * the head jumps to the next page and all content on the previous
608 * page is discarded. But before doing so, we update the overrun
609 * variable of the buffer.
611 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
613 struct ring_buffer_event *event;
616 for (head = 0; head < rb_head_size(cpu_buffer);
617 head += rb_event_length(event)) {
619 event = __rb_page_index(cpu_buffer->head_page, head);
620 BUG_ON(rb_null_event(event));
621 /* Only count data entries */
622 if (event->type != RINGBUF_TYPE_DATA)
624 cpu_buffer->overrun++;
625 cpu_buffer->entries--;
629 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
630 struct buffer_page **page)
632 struct list_head *p = (*page)->list.next;
634 if (p == &cpu_buffer->pages)
637 *page = list_entry(p, struct buffer_page, list);
641 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
643 cpu_buffer->tail_page->time_stamp = *ts;
644 cpu_buffer->write_stamp = *ts;
647 static void rb_reset_head_page(struct ring_buffer_per_cpu *cpu_buffer)
649 cpu_buffer->head_page->read = 0;
652 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
654 cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
655 cpu_buffer->reader_page->read = 0;
658 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
660 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
663 * The iterator could be on the reader page (it starts there).
664 * But the head could have moved, since the reader was
665 * found. Check for this case and assign the iterator
666 * to the head page instead of next.
668 if (iter->head_page == cpu_buffer->reader_page)
669 iter->head_page = cpu_buffer->head_page;
671 rb_inc_page(cpu_buffer, &iter->head_page);
673 iter->read_stamp = iter->head_page->time_stamp;
678 * ring_buffer_update_event - update event type and data
679 * @event: the even to update
680 * @type: the type of event
681 * @length: the size of the event field in the ring buffer
683 * Update the type and data fields of the event. The length
684 * is the actual size that is written to the ring buffer,
685 * and with this, we can determine what to place into the
689 rb_update_event(struct ring_buffer_event *event,
690 unsigned type, unsigned length)
696 case RINGBUF_TYPE_PADDING:
699 case RINGBUF_TYPE_TIME_EXTEND:
701 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
702 >> RB_ALIGNMENT_SHIFT;
705 case RINGBUF_TYPE_TIME_STAMP:
707 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
708 >> RB_ALIGNMENT_SHIFT;
711 case RINGBUF_TYPE_DATA:
712 length -= RB_EVNT_HDR_SIZE;
713 if (length > RB_MAX_SMALL_DATA) {
715 event->array[0] = length;
718 (length + (RB_ALIGNMENT-1))
719 >> RB_ALIGNMENT_SHIFT;
726 static inline unsigned rb_calculate_event_length(unsigned length)
728 struct ring_buffer_event event; /* Used only for sizeof array */
730 /* zero length can cause confusions */
734 if (length > RB_MAX_SMALL_DATA)
735 length += sizeof(event.array[0]);
737 length += RB_EVNT_HDR_SIZE;
738 length = ALIGN(length, RB_ALIGNMENT);
743 static struct ring_buffer_event *
744 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
745 unsigned type, unsigned long length, u64 *ts)
747 struct buffer_page *tail_page, *head_page, *reader_page;
749 struct ring_buffer *buffer = cpu_buffer->buffer;
750 struct ring_buffer_event *event;
752 tail_page = cpu_buffer->tail_page;
753 tail = cpu_buffer->tail_page->write;
755 if (tail + length > BUF_PAGE_SIZE) {
756 struct buffer_page *next_page = tail_page;
758 spin_lock(&cpu_buffer->lock);
759 rb_inc_page(cpu_buffer, &next_page);
761 head_page = cpu_buffer->head_page;
762 reader_page = cpu_buffer->reader_page;
764 /* we grabbed the lock before incrementing */
765 WARN_ON(next_page == reader_page);
767 if (next_page == head_page) {
768 if (!(buffer->flags & RB_FL_OVERWRITE)) {
769 spin_unlock(&cpu_buffer->lock);
773 /* count overflows */
774 rb_update_overflow(cpu_buffer);
776 rb_inc_page(cpu_buffer, &head_page);
777 cpu_buffer->head_page = head_page;
778 rb_reset_head_page(cpu_buffer);
781 if (tail != BUF_PAGE_SIZE) {
782 event = __rb_page_index(tail_page, tail);
784 event->type = RINGBUF_TYPE_PADDING;
787 tail_page->size = tail;
788 tail_page = next_page;
791 cpu_buffer->tail_page = tail_page;
792 cpu_buffer->tail_page->write = tail;
793 rb_add_stamp(cpu_buffer, ts);
794 spin_unlock(&cpu_buffer->lock);
797 BUG_ON(tail + length > BUF_PAGE_SIZE);
799 event = __rb_page_index(tail_page, tail);
800 rb_update_event(event, type, length);
806 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
809 struct ring_buffer_event *event;
812 if (unlikely(*delta > (1ULL << 59) && !once++)) {
813 printk(KERN_WARNING "Delta way too big! %llu"
814 " ts=%llu write stamp = %llu\n",
815 *delta, *ts, cpu_buffer->write_stamp);
820 * The delta is too big, we to add a
823 event = __rb_reserve_next(cpu_buffer,
824 RINGBUF_TYPE_TIME_EXTEND,
830 /* check to see if we went to the next page */
831 if (cpu_buffer->tail_page->write) {
832 /* Still on same page, update timestamp */
833 event->time_delta = *delta & TS_MASK;
834 event->array[0] = *delta >> TS_SHIFT;
835 /* commit the time event */
836 cpu_buffer->tail_page->write +=
837 rb_event_length(event);
838 cpu_buffer->write_stamp = *ts;
845 static struct ring_buffer_event *
846 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
847 unsigned type, unsigned long length)
849 struct ring_buffer_event *event;
852 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
854 if (cpu_buffer->tail_page->write) {
855 delta = ts - cpu_buffer->write_stamp;
857 if (test_time_stamp(delta)) {
860 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
865 spin_lock(&cpu_buffer->lock);
866 rb_add_stamp(cpu_buffer, &ts);
867 spin_unlock(&cpu_buffer->lock);
871 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
875 /* If the reserve went to the next page, our delta is zero */
876 if (!cpu_buffer->tail_page->write)
879 event->time_delta = delta;
885 * ring_buffer_lock_reserve - reserve a part of the buffer
886 * @buffer: the ring buffer to reserve from
887 * @length: the length of the data to reserve (excluding event header)
888 * @flags: a pointer to save the interrupt flags
890 * Returns a reseverd event on the ring buffer to copy directly to.
891 * The user of this interface will need to get the body to write into
892 * and can use the ring_buffer_event_data() interface.
894 * The length is the length of the data needed, not the event length
895 * which also includes the event header.
897 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
898 * If NULL is returned, then nothing has been allocated or locked.
900 struct ring_buffer_event *
901 ring_buffer_lock_reserve(struct ring_buffer *buffer,
902 unsigned long length,
903 unsigned long *flags)
905 struct ring_buffer_per_cpu *cpu_buffer;
906 struct ring_buffer_event *event;
909 if (atomic_read(&buffer->record_disabled))
912 local_irq_save(*flags);
913 cpu = raw_smp_processor_id();
915 if (!cpu_isset(cpu, buffer->cpumask))
918 cpu_buffer = buffer->buffers[cpu];
920 if (atomic_read(&cpu_buffer->record_disabled))
923 length = rb_calculate_event_length(length);
924 if (length > BUF_PAGE_SIZE)
927 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
934 local_irq_restore(*flags);
938 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
939 struct ring_buffer_event *event)
941 cpu_buffer->tail_page->write += rb_event_length(event);
942 cpu_buffer->tail_page->size = cpu_buffer->tail_page->write;
943 cpu_buffer->write_stamp += event->time_delta;
944 cpu_buffer->entries++;
948 * ring_buffer_unlock_commit - commit a reserved
949 * @buffer: The buffer to commit to
950 * @event: The event pointer to commit.
951 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
953 * This commits the data to the ring buffer, and releases any locks held.
955 * Must be paired with ring_buffer_lock_reserve.
957 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
958 struct ring_buffer_event *event,
961 struct ring_buffer_per_cpu *cpu_buffer;
962 int cpu = raw_smp_processor_id();
964 cpu_buffer = buffer->buffers[cpu];
966 rb_commit(cpu_buffer, event);
968 local_irq_restore(flags);
974 * ring_buffer_write - write data to the buffer without reserving
975 * @buffer: The ring buffer to write to.
976 * @length: The length of the data being written (excluding the event header)
977 * @data: The data to write to the buffer.
979 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
980 * one function. If you already have the data to write to the buffer, it
981 * may be easier to simply call this function.
983 * Note, like ring_buffer_lock_reserve, the length is the length of the data
984 * and not the length of the event which would hold the header.
986 int ring_buffer_write(struct ring_buffer *buffer,
987 unsigned long length,
990 struct ring_buffer_per_cpu *cpu_buffer;
991 struct ring_buffer_event *event;
992 unsigned long event_length, flags;
997 if (atomic_read(&buffer->record_disabled))
1000 local_irq_save(flags);
1001 cpu = raw_smp_processor_id();
1003 if (!cpu_isset(cpu, buffer->cpumask))
1006 cpu_buffer = buffer->buffers[cpu];
1008 if (atomic_read(&cpu_buffer->record_disabled))
1011 event_length = rb_calculate_event_length(length);
1012 event = rb_reserve_next_event(cpu_buffer,
1013 RINGBUF_TYPE_DATA, event_length);
1017 body = rb_event_data(event);
1019 memcpy(body, data, length);
1021 rb_commit(cpu_buffer, event);
1025 local_irq_restore(flags);
1031 * ring_buffer_record_disable - stop all writes into the buffer
1032 * @buffer: The ring buffer to stop writes to.
1034 * This prevents all writes to the buffer. Any attempt to write
1035 * to the buffer after this will fail and return NULL.
1037 * The caller should call synchronize_sched() after this.
1039 void ring_buffer_record_disable(struct ring_buffer *buffer)
1041 atomic_inc(&buffer->record_disabled);
1045 * ring_buffer_record_enable - enable writes to the buffer
1046 * @buffer: The ring buffer to enable writes
1048 * Note, multiple disables will need the same number of enables
1049 * to truely enable the writing (much like preempt_disable).
1051 void ring_buffer_record_enable(struct ring_buffer *buffer)
1053 atomic_dec(&buffer->record_disabled);
1057 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1058 * @buffer: The ring buffer to stop writes to.
1059 * @cpu: The CPU buffer to stop
1061 * This prevents all writes to the buffer. Any attempt to write
1062 * to the buffer after this will fail and return NULL.
1064 * The caller should call synchronize_sched() after this.
1066 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1068 struct ring_buffer_per_cpu *cpu_buffer;
1070 if (!cpu_isset(cpu, buffer->cpumask))
1073 cpu_buffer = buffer->buffers[cpu];
1074 atomic_inc(&cpu_buffer->record_disabled);
1078 * ring_buffer_record_enable_cpu - enable writes to the buffer
1079 * @buffer: The ring buffer to enable writes
1080 * @cpu: The CPU to enable.
1082 * Note, multiple disables will need the same number of enables
1083 * to truely enable the writing (much like preempt_disable).
1085 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1087 struct ring_buffer_per_cpu *cpu_buffer;
1089 if (!cpu_isset(cpu, buffer->cpumask))
1092 cpu_buffer = buffer->buffers[cpu];
1093 atomic_dec(&cpu_buffer->record_disabled);
1097 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1098 * @buffer: The ring buffer
1099 * @cpu: The per CPU buffer to get the entries from.
1101 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1103 struct ring_buffer_per_cpu *cpu_buffer;
1105 if (!cpu_isset(cpu, buffer->cpumask))
1108 cpu_buffer = buffer->buffers[cpu];
1109 return cpu_buffer->entries;
1113 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1114 * @buffer: The ring buffer
1115 * @cpu: The per CPU buffer to get the number of overruns from
1117 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1119 struct ring_buffer_per_cpu *cpu_buffer;
1121 if (!cpu_isset(cpu, buffer->cpumask))
1124 cpu_buffer = buffer->buffers[cpu];
1125 return cpu_buffer->overrun;
1129 * ring_buffer_entries - get the number of entries in a buffer
1130 * @buffer: The ring buffer
1132 * Returns the total number of entries in the ring buffer
1135 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1137 struct ring_buffer_per_cpu *cpu_buffer;
1138 unsigned long entries = 0;
1141 /* if you care about this being correct, lock the buffer */
1142 for_each_buffer_cpu(buffer, cpu) {
1143 cpu_buffer = buffer->buffers[cpu];
1144 entries += cpu_buffer->entries;
1151 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1152 * @buffer: The ring buffer
1154 * Returns the total number of overruns in the ring buffer
1157 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1159 struct ring_buffer_per_cpu *cpu_buffer;
1160 unsigned long overruns = 0;
1163 /* if you care about this being correct, lock the buffer */
1164 for_each_buffer_cpu(buffer, cpu) {
1165 cpu_buffer = buffer->buffers[cpu];
1166 overruns += cpu_buffer->overrun;
1173 * ring_buffer_iter_reset - reset an iterator
1174 * @iter: The iterator to reset
1176 * Resets the iterator, so that it will start from the beginning
1179 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1181 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1183 /* Iterator usage is expected to have record disabled */
1184 if (list_empty(&cpu_buffer->reader_page->list)) {
1185 iter->head_page = cpu_buffer->head_page;
1186 iter->head = cpu_buffer->head_page->read;
1188 iter->head_page = cpu_buffer->reader_page;
1189 iter->head = cpu_buffer->reader_page->read;
1192 iter->read_stamp = cpu_buffer->read_stamp;
1194 iter->read_stamp = iter->head_page->time_stamp;
1198 * ring_buffer_iter_empty - check if an iterator has no more to read
1199 * @iter: The iterator to check
1201 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1203 struct ring_buffer_per_cpu *cpu_buffer;
1205 cpu_buffer = iter->cpu_buffer;
1207 return iter->head_page == cpu_buffer->tail_page &&
1208 iter->head == cpu_buffer->tail_page->write;
1212 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1213 struct ring_buffer_event *event)
1217 switch (event->type) {
1218 case RINGBUF_TYPE_PADDING:
1221 case RINGBUF_TYPE_TIME_EXTEND:
1222 delta = event->array[0];
1224 delta += event->time_delta;
1225 cpu_buffer->read_stamp += delta;
1228 case RINGBUF_TYPE_TIME_STAMP:
1229 /* FIXME: not implemented */
1232 case RINGBUF_TYPE_DATA:
1233 cpu_buffer->read_stamp += event->time_delta;
1243 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1244 struct ring_buffer_event *event)
1248 switch (event->type) {
1249 case RINGBUF_TYPE_PADDING:
1252 case RINGBUF_TYPE_TIME_EXTEND:
1253 delta = event->array[0];
1255 delta += event->time_delta;
1256 iter->read_stamp += delta;
1259 case RINGBUF_TYPE_TIME_STAMP:
1260 /* FIXME: not implemented */
1263 case RINGBUF_TYPE_DATA:
1264 iter->read_stamp += event->time_delta;
1273 static struct buffer_page *
1274 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1276 struct buffer_page *reader = NULL;
1277 unsigned long flags;
1279 spin_lock_irqsave(&cpu_buffer->lock, flags);
1282 reader = cpu_buffer->reader_page;
1284 /* If there's more to read, return this page */
1285 if (cpu_buffer->reader_page->read < reader->size)
1288 /* Never should we have an index greater than the size */
1289 WARN_ON(cpu_buffer->reader_page->read > reader->size);
1291 /* check if we caught up to the tail */
1293 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1297 * Splice the empty reader page into the list around the head.
1298 * Reset the reader page to size zero.
1301 reader = cpu_buffer->head_page;
1302 cpu_buffer->reader_page->list.next = reader->list.next;
1303 cpu_buffer->reader_page->list.prev = reader->list.prev;
1304 cpu_buffer->reader_page->size = 0;
1306 /* Make the reader page now replace the head */
1307 reader->list.prev->next = &cpu_buffer->reader_page->list;
1308 reader->list.next->prev = &cpu_buffer->reader_page->list;
1311 * If the tail is on the reader, then we must set the head
1312 * to the inserted page, otherwise we set it one before.
1314 cpu_buffer->head_page = cpu_buffer->reader_page;
1316 if (cpu_buffer->tail_page != reader)
1317 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1319 /* Finally update the reader page to the new head */
1320 cpu_buffer->reader_page = reader;
1321 rb_reset_reader_page(cpu_buffer);
1326 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1331 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1333 struct ring_buffer_event *event;
1334 struct buffer_page *reader;
1337 reader = rb_get_reader_page(cpu_buffer);
1339 /* This function should not be called when buffer is empty */
1342 event = rb_reader_event(cpu_buffer);
1344 if (event->type == RINGBUF_TYPE_DATA)
1345 cpu_buffer->entries--;
1347 rb_update_read_stamp(cpu_buffer, event);
1349 length = rb_event_length(event);
1350 cpu_buffer->reader_page->read += length;
1353 static void rb_advance_iter(struct ring_buffer_iter *iter)
1355 struct ring_buffer *buffer;
1356 struct ring_buffer_per_cpu *cpu_buffer;
1357 struct ring_buffer_event *event;
1360 cpu_buffer = iter->cpu_buffer;
1361 buffer = cpu_buffer->buffer;
1364 * Check if we are at the end of the buffer.
1366 if (iter->head >= iter->head_page->size) {
1367 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1372 event = rb_iter_head_event(iter);
1374 length = rb_event_length(event);
1377 * This should not be called to advance the header if we are
1378 * at the tail of the buffer.
1380 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1381 (iter->head + length > cpu_buffer->tail_page->write));
1383 rb_update_iter_read_stamp(iter, event);
1385 iter->head += length;
1387 /* check for end of page padding */
1388 if ((iter->head >= iter->head_page->size) &&
1389 (iter->head_page != cpu_buffer->tail_page))
1390 rb_advance_iter(iter);
1394 * ring_buffer_peek - peek at the next event to be read
1395 * @buffer: The ring buffer to read
1396 * @cpu: The cpu to peak at
1397 * @ts: The timestamp counter of this event.
1399 * This will return the event that will be read next, but does
1400 * not consume the data.
1402 struct ring_buffer_event *
1403 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1405 struct ring_buffer_per_cpu *cpu_buffer;
1406 struct ring_buffer_event *event;
1407 struct buffer_page *reader;
1409 if (!cpu_isset(cpu, buffer->cpumask))
1412 cpu_buffer = buffer->buffers[cpu];
1415 reader = rb_get_reader_page(cpu_buffer);
1419 event = rb_reader_event(cpu_buffer);
1421 switch (event->type) {
1422 case RINGBUF_TYPE_PADDING:
1424 rb_advance_reader(cpu_buffer);
1427 case RINGBUF_TYPE_TIME_EXTEND:
1428 /* Internal data, OK to advance */
1429 rb_advance_reader(cpu_buffer);
1432 case RINGBUF_TYPE_TIME_STAMP:
1433 /* FIXME: not implemented */
1434 rb_advance_reader(cpu_buffer);
1437 case RINGBUF_TYPE_DATA:
1439 *ts = cpu_buffer->read_stamp + event->time_delta;
1440 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1452 * ring_buffer_iter_peek - peek at the next event to be read
1453 * @iter: The ring buffer iterator
1454 * @ts: The timestamp counter of this event.
1456 * This will return the event that will be read next, but does
1457 * not increment the iterator.
1459 struct ring_buffer_event *
1460 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1462 struct ring_buffer *buffer;
1463 struct ring_buffer_per_cpu *cpu_buffer;
1464 struct ring_buffer_event *event;
1466 if (ring_buffer_iter_empty(iter))
1469 cpu_buffer = iter->cpu_buffer;
1470 buffer = cpu_buffer->buffer;
1473 if (rb_per_cpu_empty(cpu_buffer))
1476 event = rb_iter_head_event(iter);
1478 switch (event->type) {
1479 case RINGBUF_TYPE_PADDING:
1483 case RINGBUF_TYPE_TIME_EXTEND:
1484 /* Internal data, OK to advance */
1485 rb_advance_iter(iter);
1488 case RINGBUF_TYPE_TIME_STAMP:
1489 /* FIXME: not implemented */
1490 rb_advance_iter(iter);
1493 case RINGBUF_TYPE_DATA:
1495 *ts = iter->read_stamp + event->time_delta;
1496 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1508 * ring_buffer_consume - return an event and consume it
1509 * @buffer: The ring buffer to get the next event from
1511 * Returns the next event in the ring buffer, and that event is consumed.
1512 * Meaning, that sequential reads will keep returning a different event,
1513 * and eventually empty the ring buffer if the producer is slower.
1515 struct ring_buffer_event *
1516 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1518 struct ring_buffer_per_cpu *cpu_buffer;
1519 struct ring_buffer_event *event;
1521 if (!cpu_isset(cpu, buffer->cpumask))
1524 event = ring_buffer_peek(buffer, cpu, ts);
1528 cpu_buffer = buffer->buffers[cpu];
1529 rb_advance_reader(cpu_buffer);
1535 * ring_buffer_read_start - start a non consuming read of the buffer
1536 * @buffer: The ring buffer to read from
1537 * @cpu: The cpu buffer to iterate over
1539 * This starts up an iteration through the buffer. It also disables
1540 * the recording to the buffer until the reading is finished.
1541 * This prevents the reading from being corrupted. This is not
1542 * a consuming read, so a producer is not expected.
1544 * Must be paired with ring_buffer_finish.
1546 struct ring_buffer_iter *
1547 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1549 struct ring_buffer_per_cpu *cpu_buffer;
1550 struct ring_buffer_iter *iter;
1551 unsigned long flags;
1553 if (!cpu_isset(cpu, buffer->cpumask))
1556 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1560 cpu_buffer = buffer->buffers[cpu];
1562 iter->cpu_buffer = cpu_buffer;
1564 atomic_inc(&cpu_buffer->record_disabled);
1565 synchronize_sched();
1567 spin_lock_irqsave(&cpu_buffer->lock, flags);
1568 ring_buffer_iter_reset(iter);
1569 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1575 * ring_buffer_finish - finish reading the iterator of the buffer
1576 * @iter: The iterator retrieved by ring_buffer_start
1578 * This re-enables the recording to the buffer, and frees the
1582 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1584 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1586 atomic_dec(&cpu_buffer->record_disabled);
1591 * ring_buffer_read - read the next item in the ring buffer by the iterator
1592 * @iter: The ring buffer iterator
1593 * @ts: The time stamp of the event read.
1595 * This reads the next event in the ring buffer and increments the iterator.
1597 struct ring_buffer_event *
1598 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1600 struct ring_buffer_event *event;
1602 event = ring_buffer_iter_peek(iter, ts);
1606 rb_advance_iter(iter);
1612 * ring_buffer_size - return the size of the ring buffer (in bytes)
1613 * @buffer: The ring buffer.
1615 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1617 return BUF_PAGE_SIZE * buffer->pages;
1621 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1623 cpu_buffer->head_page
1624 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1625 cpu_buffer->head_page->size = 0;
1626 cpu_buffer->tail_page = cpu_buffer->head_page;
1627 cpu_buffer->tail_page->size = 0;
1628 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1629 cpu_buffer->reader_page->size = 0;
1631 cpu_buffer->head_page->read = 0;
1632 cpu_buffer->tail_page->write = 0;
1633 cpu_buffer->reader_page->read = 0;
1635 cpu_buffer->overrun = 0;
1636 cpu_buffer->entries = 0;
1640 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1641 * @buffer: The ring buffer to reset a per cpu buffer of
1642 * @cpu: The CPU buffer to be reset
1644 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1646 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1647 unsigned long flags;
1649 if (!cpu_isset(cpu, buffer->cpumask))
1652 spin_lock_irqsave(&cpu_buffer->lock, flags);
1654 rb_reset_cpu(cpu_buffer);
1656 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1660 * ring_buffer_reset - reset a ring buffer
1661 * @buffer: The ring buffer to reset all cpu buffers
1663 void ring_buffer_reset(struct ring_buffer *buffer)
1667 for_each_buffer_cpu(buffer, cpu)
1668 ring_buffer_reset_cpu(buffer, cpu);
1672 * rind_buffer_empty - is the ring buffer empty?
1673 * @buffer: The ring buffer to test
1675 int ring_buffer_empty(struct ring_buffer *buffer)
1677 struct ring_buffer_per_cpu *cpu_buffer;
1680 /* yes this is racy, but if you don't like the race, lock the buffer */
1681 for_each_buffer_cpu(buffer, cpu) {
1682 cpu_buffer = buffer->buffers[cpu];
1683 if (!rb_per_cpu_empty(cpu_buffer))
1690 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1691 * @buffer: The ring buffer
1692 * @cpu: The CPU buffer to test
1694 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1696 struct ring_buffer_per_cpu *cpu_buffer;
1698 if (!cpu_isset(cpu, buffer->cpumask))
1701 cpu_buffer = buffer->buffers[cpu];
1702 return rb_per_cpu_empty(cpu_buffer);
1706 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1707 * @buffer_a: One buffer to swap with
1708 * @buffer_b: The other buffer to swap with
1710 * This function is useful for tracers that want to take a "snapshot"
1711 * of a CPU buffer and has another back up buffer lying around.
1712 * it is expected that the tracer handles the cpu buffer not being
1713 * used at the moment.
1715 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1716 struct ring_buffer *buffer_b, int cpu)
1718 struct ring_buffer_per_cpu *cpu_buffer_a;
1719 struct ring_buffer_per_cpu *cpu_buffer_b;
1721 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1722 !cpu_isset(cpu, buffer_b->cpumask))
1725 /* At least make sure the two buffers are somewhat the same */
1726 if (buffer_a->size != buffer_b->size ||
1727 buffer_a->pages != buffer_b->pages)
1730 cpu_buffer_a = buffer_a->buffers[cpu];
1731 cpu_buffer_b = buffer_b->buffers[cpu];
1734 * We can't do a synchronize_sched here because this
1735 * function can be called in atomic context.
1736 * Normally this will be called from the same CPU as cpu.
1737 * If not it's up to the caller to protect this.
1739 atomic_inc(&cpu_buffer_a->record_disabled);
1740 atomic_inc(&cpu_buffer_b->record_disabled);
1742 buffer_a->buffers[cpu] = cpu_buffer_b;
1743 buffer_b->buffers[cpu] = cpu_buffer_a;
1745 cpu_buffer_b->buffer = buffer_a;
1746 cpu_buffer_a->buffer = buffer_b;
1748 atomic_dec(&cpu_buffer_a->record_disabled);
1749 atomic_dec(&cpu_buffer_b->record_disabled);