]> pilppa.org Git - linux-2.6-omap-h63xx.git/blob - kernel/trace/ring_buffer.c
09d4f0d879a70fe3196d77a60e78d23b2a82d808
[linux-2.6-omap-h63xx.git] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
20 #define DEBUG_SHIFT 0
21
22 /* FIXME!!! */
23 u64 ring_buffer_time_stamp(int cpu)
24 {
25         /* shift to debug/test normalization and TIME_EXTENTS */
26         return sched_clock() << DEBUG_SHIFT;
27 }
28
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30 {
31         /* Just stupid testing the normalize function and deltas */
32         *ts >>= DEBUG_SHIFT;
33 }
34
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT      2
37 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA       28
39
40 enum {
41         RB_LEN_TIME_EXTEND = 8,
42         RB_LEN_TIME_STAMP = 16,
43 };
44
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
48 {
49         unsigned length;
50
51         switch (event->type) {
52         case RINGBUF_TYPE_PADDING:
53                 /* undefined */
54                 return -1;
55
56         case RINGBUF_TYPE_TIME_EXTEND:
57                 return RB_LEN_TIME_EXTEND;
58
59         case RINGBUF_TYPE_TIME_STAMP:
60                 return RB_LEN_TIME_STAMP;
61
62         case RINGBUF_TYPE_DATA:
63                 if (event->len)
64                         length = event->len << RB_ALIGNMENT_SHIFT;
65                 else
66                         length = event->array[0];
67                 return length + RB_EVNT_HDR_SIZE;
68         default:
69                 BUG();
70         }
71         /* not hit */
72         return 0;
73 }
74
75 /**
76  * ring_buffer_event_length - return the length of the event
77  * @event: the event to get the length of
78  */
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80 {
81         return rb_event_length(event);
82 }
83
84 /* inline for ring buffer fast paths */
85 static inline void *
86 rb_event_data(struct ring_buffer_event *event)
87 {
88         BUG_ON(event->type != RINGBUF_TYPE_DATA);
89         /* If length is in len field, then array[0] has the data */
90         if (event->len)
91                 return (void *)&event->array[0];
92         /* Otherwise length is in array[0] and array[1] has the data */
93         return (void *)&event->array[1];
94 }
95
96 /**
97  * ring_buffer_event_data - return the data of the event
98  * @event: the event to get the data from
99  */
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
101 {
102         return rb_event_data(event);
103 }
104
105 #define for_each_buffer_cpu(buffer, cpu)                \
106         for_each_cpu_mask(cpu, buffer->cpumask)
107
108 #define TS_SHIFT        27
109 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST   (~TS_MASK)
111
112 /*
113  * This hack stolen from mm/slob.c.
114  * We can store per page timing information in the page frame of the page.
115  * Thanks to Peter Zijlstra for suggesting this idea.
116  */
117 struct buffer_page {
118         u64              time_stamp;    /* page time stamp */
119         unsigned         size;          /* size of page data */
120         unsigned         write;         /* index for next write */
121         unsigned         read;          /* index for next read */
122         struct list_head list;          /* list of free pages */
123         void *page;                     /* Actual data page */
124 };
125
126 /*
127  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
128  * this issue out.
129  */
130 static inline void free_buffer_page(struct buffer_page *bpage)
131 {
132         if (bpage->page)
133                 __free_page(bpage->page);
134         kfree(bpage);
135 }
136
137 /*
138  * We need to fit the time_stamp delta into 27 bits.
139  */
140 static inline int test_time_stamp(u64 delta)
141 {
142         if (delta & TS_DELTA_TEST)
143                 return 1;
144         return 0;
145 }
146
147 #define BUF_PAGE_SIZE PAGE_SIZE
148
149 /*
150  * head_page == tail_page && head == tail then buffer is empty.
151  */
152 struct ring_buffer_per_cpu {
153         int                             cpu;
154         struct ring_buffer              *buffer;
155         spinlock_t                      lock;
156         struct lock_class_key           lock_key;
157         struct list_head                pages;
158         struct buffer_page              *head_page;     /* read from head */
159         struct buffer_page              *tail_page;     /* write to tail */
160         struct buffer_page              *reader_page;
161         unsigned long                   overrun;
162         unsigned long                   entries;
163         u64                             write_stamp;
164         u64                             read_stamp;
165         atomic_t                        record_disabled;
166 };
167
168 struct ring_buffer {
169         unsigned long                   size;
170         unsigned                        pages;
171         unsigned                        flags;
172         int                             cpus;
173         cpumask_t                       cpumask;
174         atomic_t                        record_disabled;
175
176         struct mutex                    mutex;
177
178         struct ring_buffer_per_cpu      **buffers;
179 };
180
181 struct ring_buffer_iter {
182         struct ring_buffer_per_cpu      *cpu_buffer;
183         unsigned long                   head;
184         struct buffer_page              *head_page;
185         u64                             read_stamp;
186 };
187
188 #define RB_WARN_ON(buffer, cond)                        \
189         if (unlikely(cond)) {                           \
190                 atomic_inc(&buffer->record_disabled);   \
191                 WARN_ON(1);                             \
192                 return -1;                              \
193         }
194
195 /**
196  * check_pages - integrity check of buffer pages
197  * @cpu_buffer: CPU buffer with pages to test
198  *
199  * As a safty measure we check to make sure the data pages have not
200  * been corrupted.
201  */
202 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
203 {
204         struct list_head *head = &cpu_buffer->pages;
205         struct buffer_page *page, *tmp;
206
207         RB_WARN_ON(cpu_buffer, head->next->prev != head);
208         RB_WARN_ON(cpu_buffer, head->prev->next != head);
209
210         list_for_each_entry_safe(page, tmp, head, list) {
211                 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
212                 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
213         }
214
215         return 0;
216 }
217
218 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
219 {
220         return cpu_buffer->head_page->size;
221 }
222
223 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
224                              unsigned nr_pages)
225 {
226         struct list_head *head = &cpu_buffer->pages;
227         struct buffer_page *page, *tmp;
228         unsigned long addr;
229         LIST_HEAD(pages);
230         unsigned i;
231
232         for (i = 0; i < nr_pages; i++) {
233                 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
234                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
235                 if (!page)
236                         goto free_pages;
237                 list_add(&page->list, &pages);
238
239                 addr = __get_free_page(GFP_KERNEL);
240                 if (!addr)
241                         goto free_pages;
242                 page->page = (void *)addr;
243         }
244
245         list_splice(&pages, head);
246
247         rb_check_pages(cpu_buffer);
248
249         return 0;
250
251  free_pages:
252         list_for_each_entry_safe(page, tmp, &pages, list) {
253                 list_del_init(&page->list);
254                 free_buffer_page(page);
255         }
256         return -ENOMEM;
257 }
258
259 static struct ring_buffer_per_cpu *
260 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
261 {
262         struct ring_buffer_per_cpu *cpu_buffer;
263         struct buffer_page *page;
264         unsigned long addr;
265         int ret;
266
267         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
268                                   GFP_KERNEL, cpu_to_node(cpu));
269         if (!cpu_buffer)
270                 return NULL;
271
272         cpu_buffer->cpu = cpu;
273         cpu_buffer->buffer = buffer;
274         spin_lock_init(&cpu_buffer->lock);
275         INIT_LIST_HEAD(&cpu_buffer->pages);
276
277         page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
278                             GFP_KERNEL, cpu_to_node(cpu));
279         if (!page)
280                 goto fail_free_buffer;
281
282         cpu_buffer->reader_page = page;
283         addr = __get_free_page(GFP_KERNEL);
284         if (!addr)
285                 goto fail_free_reader;
286         page->page = (void *)addr;
287
288         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
289         cpu_buffer->reader_page->size = 0;
290
291         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
292         if (ret < 0)
293                 goto fail_free_reader;
294
295         cpu_buffer->head_page
296                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
297         cpu_buffer->tail_page
298                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
299
300         return cpu_buffer;
301
302  fail_free_reader:
303         free_buffer_page(cpu_buffer->reader_page);
304
305  fail_free_buffer:
306         kfree(cpu_buffer);
307         return NULL;
308 }
309
310 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
311 {
312         struct list_head *head = &cpu_buffer->pages;
313         struct buffer_page *page, *tmp;
314
315         list_del_init(&cpu_buffer->reader_page->list);
316         free_buffer_page(cpu_buffer->reader_page);
317
318         list_for_each_entry_safe(page, tmp, head, list) {
319                 list_del_init(&page->list);
320                 free_buffer_page(page);
321         }
322         kfree(cpu_buffer);
323 }
324
325 /*
326  * Causes compile errors if the struct buffer_page gets bigger
327  * than the struct page.
328  */
329 extern int ring_buffer_page_too_big(void);
330
331 /**
332  * ring_buffer_alloc - allocate a new ring_buffer
333  * @size: the size in bytes that is needed.
334  * @flags: attributes to set for the ring buffer.
335  *
336  * Currently the only flag that is available is the RB_FL_OVERWRITE
337  * flag. This flag means that the buffer will overwrite old data
338  * when the buffer wraps. If this flag is not set, the buffer will
339  * drop data when the tail hits the head.
340  */
341 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
342 {
343         struct ring_buffer *buffer;
344         int bsize;
345         int cpu;
346
347         /* Paranoid! Optimizes out when all is well */
348         if (sizeof(struct buffer_page) > sizeof(struct page))
349                 ring_buffer_page_too_big();
350
351
352         /* keep it in its own cache line */
353         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
354                          GFP_KERNEL);
355         if (!buffer)
356                 return NULL;
357
358         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
359         buffer->flags = flags;
360
361         /* need at least two pages */
362         if (buffer->pages == 1)
363                 buffer->pages++;
364
365         buffer->cpumask = cpu_possible_map;
366         buffer->cpus = nr_cpu_ids;
367
368         bsize = sizeof(void *) * nr_cpu_ids;
369         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
370                                   GFP_KERNEL);
371         if (!buffer->buffers)
372                 goto fail_free_buffer;
373
374         for_each_buffer_cpu(buffer, cpu) {
375                 buffer->buffers[cpu] =
376                         rb_allocate_cpu_buffer(buffer, cpu);
377                 if (!buffer->buffers[cpu])
378                         goto fail_free_buffers;
379         }
380
381         mutex_init(&buffer->mutex);
382
383         return buffer;
384
385  fail_free_buffers:
386         for_each_buffer_cpu(buffer, cpu) {
387                 if (buffer->buffers[cpu])
388                         rb_free_cpu_buffer(buffer->buffers[cpu]);
389         }
390         kfree(buffer->buffers);
391
392  fail_free_buffer:
393         kfree(buffer);
394         return NULL;
395 }
396
397 /**
398  * ring_buffer_free - free a ring buffer.
399  * @buffer: the buffer to free.
400  */
401 void
402 ring_buffer_free(struct ring_buffer *buffer)
403 {
404         int cpu;
405
406         for_each_buffer_cpu(buffer, cpu)
407                 rb_free_cpu_buffer(buffer->buffers[cpu]);
408
409         kfree(buffer);
410 }
411
412 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
413
414 static void
415 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
416 {
417         struct buffer_page *page;
418         struct list_head *p;
419         unsigned i;
420
421         atomic_inc(&cpu_buffer->record_disabled);
422         synchronize_sched();
423
424         for (i = 0; i < nr_pages; i++) {
425                 BUG_ON(list_empty(&cpu_buffer->pages));
426                 p = cpu_buffer->pages.next;
427                 page = list_entry(p, struct buffer_page, list);
428                 list_del_init(&page->list);
429                 free_buffer_page(page);
430         }
431         BUG_ON(list_empty(&cpu_buffer->pages));
432
433         rb_reset_cpu(cpu_buffer);
434
435         rb_check_pages(cpu_buffer);
436
437         atomic_dec(&cpu_buffer->record_disabled);
438
439 }
440
441 static void
442 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
443                 struct list_head *pages, unsigned nr_pages)
444 {
445         struct buffer_page *page;
446         struct list_head *p;
447         unsigned i;
448
449         atomic_inc(&cpu_buffer->record_disabled);
450         synchronize_sched();
451
452         for (i = 0; i < nr_pages; i++) {
453                 BUG_ON(list_empty(pages));
454                 p = pages->next;
455                 page = list_entry(p, struct buffer_page, list);
456                 list_del_init(&page->list);
457                 list_add_tail(&page->list, &cpu_buffer->pages);
458         }
459         rb_reset_cpu(cpu_buffer);
460
461         rb_check_pages(cpu_buffer);
462
463         atomic_dec(&cpu_buffer->record_disabled);
464 }
465
466 /**
467  * ring_buffer_resize - resize the ring buffer
468  * @buffer: the buffer to resize.
469  * @size: the new size.
470  *
471  * The tracer is responsible for making sure that the buffer is
472  * not being used while changing the size.
473  * Note: We may be able to change the above requirement by using
474  *  RCU synchronizations.
475  *
476  * Minimum size is 2 * BUF_PAGE_SIZE.
477  *
478  * Returns -1 on failure.
479  */
480 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
481 {
482         struct ring_buffer_per_cpu *cpu_buffer;
483         unsigned nr_pages, rm_pages, new_pages;
484         struct buffer_page *page, *tmp;
485         unsigned long buffer_size;
486         unsigned long addr;
487         LIST_HEAD(pages);
488         int i, cpu;
489
490         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
491         size *= BUF_PAGE_SIZE;
492         buffer_size = buffer->pages * BUF_PAGE_SIZE;
493
494         /* we need a minimum of two pages */
495         if (size < BUF_PAGE_SIZE * 2)
496                 size = BUF_PAGE_SIZE * 2;
497
498         if (size == buffer_size)
499                 return size;
500
501         mutex_lock(&buffer->mutex);
502
503         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
504
505         if (size < buffer_size) {
506
507                 /* easy case, just free pages */
508                 BUG_ON(nr_pages >= buffer->pages);
509
510                 rm_pages = buffer->pages - nr_pages;
511
512                 for_each_buffer_cpu(buffer, cpu) {
513                         cpu_buffer = buffer->buffers[cpu];
514                         rb_remove_pages(cpu_buffer, rm_pages);
515                 }
516                 goto out;
517         }
518
519         /*
520          * This is a bit more difficult. We only want to add pages
521          * when we can allocate enough for all CPUs. We do this
522          * by allocating all the pages and storing them on a local
523          * link list. If we succeed in our allocation, then we
524          * add these pages to the cpu_buffers. Otherwise we just free
525          * them all and return -ENOMEM;
526          */
527         BUG_ON(nr_pages <= buffer->pages);
528         new_pages = nr_pages - buffer->pages;
529
530         for_each_buffer_cpu(buffer, cpu) {
531                 for (i = 0; i < new_pages; i++) {
532                         page = kzalloc_node(ALIGN(sizeof(*page),
533                                                   cache_line_size()),
534                                             GFP_KERNEL, cpu_to_node(cpu));
535                         if (!page)
536                                 goto free_pages;
537                         list_add(&page->list, &pages);
538                         addr = __get_free_page(GFP_KERNEL);
539                         if (!addr)
540                                 goto free_pages;
541                         page->page = (void *)addr;
542                 }
543         }
544
545         for_each_buffer_cpu(buffer, cpu) {
546                 cpu_buffer = buffer->buffers[cpu];
547                 rb_insert_pages(cpu_buffer, &pages, new_pages);
548         }
549
550         BUG_ON(!list_empty(&pages));
551
552  out:
553         buffer->pages = nr_pages;
554         mutex_unlock(&buffer->mutex);
555
556         return size;
557
558  free_pages:
559         list_for_each_entry_safe(page, tmp, &pages, list) {
560                 list_del_init(&page->list);
561                 free_buffer_page(page);
562         }
563         return -ENOMEM;
564 }
565
566 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
567 {
568         return cpu_buffer->reader_page->read == cpu_buffer->reader_page->size &&
569                 (cpu_buffer->tail_page == cpu_buffer->reader_page ||
570                  (cpu_buffer->tail_page == cpu_buffer->head_page &&
571                   cpu_buffer->head_page->read ==
572                   cpu_buffer->tail_page->write));
573 }
574
575 static inline int rb_null_event(struct ring_buffer_event *event)
576 {
577         return event->type == RINGBUF_TYPE_PADDING;
578 }
579
580 static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
581 {
582         return page->page + index;
583 }
584
585 static inline struct ring_buffer_event *
586 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
587 {
588         return __rb_page_index(cpu_buffer->reader_page,
589                                cpu_buffer->reader_page->read);
590 }
591
592 static inline struct ring_buffer_event *
593 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
594 {
595         return __rb_page_index(cpu_buffer->head_page,
596                                cpu_buffer->head_page->read);
597 }
598
599 static inline struct ring_buffer_event *
600 rb_iter_head_event(struct ring_buffer_iter *iter)
601 {
602         return __rb_page_index(iter->head_page, iter->head);
603 }
604
605 /*
606  * When the tail hits the head and the buffer is in overwrite mode,
607  * the head jumps to the next page and all content on the previous
608  * page is discarded. But before doing so, we update the overrun
609  * variable of the buffer.
610  */
611 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
612 {
613         struct ring_buffer_event *event;
614         unsigned long head;
615
616         for (head = 0; head < rb_head_size(cpu_buffer);
617              head += rb_event_length(event)) {
618
619                 event = __rb_page_index(cpu_buffer->head_page, head);
620                 BUG_ON(rb_null_event(event));
621                 /* Only count data entries */
622                 if (event->type != RINGBUF_TYPE_DATA)
623                         continue;
624                 cpu_buffer->overrun++;
625                 cpu_buffer->entries--;
626         }
627 }
628
629 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
630                                struct buffer_page **page)
631 {
632         struct list_head *p = (*page)->list.next;
633
634         if (p == &cpu_buffer->pages)
635                 p = p->next;
636
637         *page = list_entry(p, struct buffer_page, list);
638 }
639
640 static inline void
641 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
642 {
643         cpu_buffer->tail_page->time_stamp = *ts;
644         cpu_buffer->write_stamp = *ts;
645 }
646
647 static void rb_reset_head_page(struct ring_buffer_per_cpu *cpu_buffer)
648 {
649         cpu_buffer->head_page->read = 0;
650 }
651
652 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
653 {
654         cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
655         cpu_buffer->reader_page->read = 0;
656 }
657
658 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
659 {
660         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
661
662         /*
663          * The iterator could be on the reader page (it starts there).
664          * But the head could have moved, since the reader was
665          * found. Check for this case and assign the iterator
666          * to the head page instead of next.
667          */
668         if (iter->head_page == cpu_buffer->reader_page)
669                 iter->head_page = cpu_buffer->head_page;
670         else
671                 rb_inc_page(cpu_buffer, &iter->head_page);
672
673         iter->read_stamp = iter->head_page->time_stamp;
674         iter->head = 0;
675 }
676
677 /**
678  * ring_buffer_update_event - update event type and data
679  * @event: the even to update
680  * @type: the type of event
681  * @length: the size of the event field in the ring buffer
682  *
683  * Update the type and data fields of the event. The length
684  * is the actual size that is written to the ring buffer,
685  * and with this, we can determine what to place into the
686  * data field.
687  */
688 static inline void
689 rb_update_event(struct ring_buffer_event *event,
690                          unsigned type, unsigned length)
691 {
692         event->type = type;
693
694         switch (type) {
695
696         case RINGBUF_TYPE_PADDING:
697                 break;
698
699         case RINGBUF_TYPE_TIME_EXTEND:
700                 event->len =
701                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
702                         >> RB_ALIGNMENT_SHIFT;
703                 break;
704
705         case RINGBUF_TYPE_TIME_STAMP:
706                 event->len =
707                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
708                         >> RB_ALIGNMENT_SHIFT;
709                 break;
710
711         case RINGBUF_TYPE_DATA:
712                 length -= RB_EVNT_HDR_SIZE;
713                 if (length > RB_MAX_SMALL_DATA) {
714                         event->len = 0;
715                         event->array[0] = length;
716                 } else
717                         event->len =
718                                 (length + (RB_ALIGNMENT-1))
719                                 >> RB_ALIGNMENT_SHIFT;
720                 break;
721         default:
722                 BUG();
723         }
724 }
725
726 static inline unsigned rb_calculate_event_length(unsigned length)
727 {
728         struct ring_buffer_event event; /* Used only for sizeof array */
729
730         /* zero length can cause confusions */
731         if (!length)
732                 length = 1;
733
734         if (length > RB_MAX_SMALL_DATA)
735                 length += sizeof(event.array[0]);
736
737         length += RB_EVNT_HDR_SIZE;
738         length = ALIGN(length, RB_ALIGNMENT);
739
740         return length;
741 }
742
743 static struct ring_buffer_event *
744 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
745                   unsigned type, unsigned long length, u64 *ts)
746 {
747         struct buffer_page *tail_page, *head_page, *reader_page;
748         unsigned long tail;
749         struct ring_buffer *buffer = cpu_buffer->buffer;
750         struct ring_buffer_event *event;
751
752         tail_page = cpu_buffer->tail_page;
753         tail = cpu_buffer->tail_page->write;
754
755         if (tail + length > BUF_PAGE_SIZE) {
756                 struct buffer_page *next_page = tail_page;
757
758                 spin_lock(&cpu_buffer->lock);
759                 rb_inc_page(cpu_buffer, &next_page);
760
761                 head_page = cpu_buffer->head_page;
762                 reader_page = cpu_buffer->reader_page;
763
764                 /* we grabbed the lock before incrementing */
765                 WARN_ON(next_page == reader_page);
766
767                 if (next_page == head_page) {
768                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
769                                 spin_unlock(&cpu_buffer->lock);
770                                 return NULL;
771                         }
772
773                         /* count overflows */
774                         rb_update_overflow(cpu_buffer);
775
776                         rb_inc_page(cpu_buffer, &head_page);
777                         cpu_buffer->head_page = head_page;
778                         rb_reset_head_page(cpu_buffer);
779                 }
780
781                 if (tail != BUF_PAGE_SIZE) {
782                         event = __rb_page_index(tail_page, tail);
783                         /* page padding */
784                         event->type = RINGBUF_TYPE_PADDING;
785                 }
786
787                 tail_page->size = tail;
788                 tail_page = next_page;
789                 tail_page->size = 0;
790                 tail = 0;
791                 cpu_buffer->tail_page = tail_page;
792                 cpu_buffer->tail_page->write = tail;
793                 rb_add_stamp(cpu_buffer, ts);
794                 spin_unlock(&cpu_buffer->lock);
795         }
796
797         BUG_ON(tail + length > BUF_PAGE_SIZE);
798
799         event = __rb_page_index(tail_page, tail);
800         rb_update_event(event, type, length);
801
802         return event;
803 }
804
805 static int
806 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
807                   u64 *ts, u64 *delta)
808 {
809         struct ring_buffer_event *event;
810         static int once;
811
812         if (unlikely(*delta > (1ULL << 59) && !once++)) {
813                 printk(KERN_WARNING "Delta way too big! %llu"
814                        " ts=%llu write stamp = %llu\n",
815                        *delta, *ts, cpu_buffer->write_stamp);
816                 WARN_ON(1);
817         }
818
819         /*
820          * The delta is too big, we to add a
821          * new timestamp.
822          */
823         event = __rb_reserve_next(cpu_buffer,
824                                   RINGBUF_TYPE_TIME_EXTEND,
825                                   RB_LEN_TIME_EXTEND,
826                                   ts);
827         if (!event)
828                 return -1;
829
830         /* check to see if we went to the next page */
831         if (cpu_buffer->tail_page->write) {
832                 /* Still on same page, update timestamp */
833                 event->time_delta = *delta & TS_MASK;
834                 event->array[0] = *delta >> TS_SHIFT;
835                 /* commit the time event */
836                 cpu_buffer->tail_page->write +=
837                         rb_event_length(event);
838                 cpu_buffer->write_stamp = *ts;
839                 *delta = 0;
840         }
841
842         return 0;
843 }
844
845 static struct ring_buffer_event *
846 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
847                       unsigned type, unsigned long length)
848 {
849         struct ring_buffer_event *event;
850         u64 ts, delta;
851
852         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
853
854         if (cpu_buffer->tail_page->write) {
855                 delta = ts - cpu_buffer->write_stamp;
856
857                 if (test_time_stamp(delta)) {
858                         int ret;
859
860                         ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
861                         if (ret < 0)
862                                 return NULL;
863                 }
864         } else {
865                 spin_lock(&cpu_buffer->lock);
866                 rb_add_stamp(cpu_buffer, &ts);
867                 spin_unlock(&cpu_buffer->lock);
868                 delta = 0;
869         }
870
871         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
872         if (!event)
873                 return NULL;
874
875         /* If the reserve went to the next page, our delta is zero */
876         if (!cpu_buffer->tail_page->write)
877                 delta = 0;
878
879         event->time_delta = delta;
880
881         return event;
882 }
883
884 /**
885  * ring_buffer_lock_reserve - reserve a part of the buffer
886  * @buffer: the ring buffer to reserve from
887  * @length: the length of the data to reserve (excluding event header)
888  * @flags: a pointer to save the interrupt flags
889  *
890  * Returns a reseverd event on the ring buffer to copy directly to.
891  * The user of this interface will need to get the body to write into
892  * and can use the ring_buffer_event_data() interface.
893  *
894  * The length is the length of the data needed, not the event length
895  * which also includes the event header.
896  *
897  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
898  * If NULL is returned, then nothing has been allocated or locked.
899  */
900 struct ring_buffer_event *
901 ring_buffer_lock_reserve(struct ring_buffer *buffer,
902                          unsigned long length,
903                          unsigned long *flags)
904 {
905         struct ring_buffer_per_cpu *cpu_buffer;
906         struct ring_buffer_event *event;
907         int cpu;
908
909         if (atomic_read(&buffer->record_disabled))
910                 return NULL;
911
912         local_irq_save(*flags);
913         cpu = raw_smp_processor_id();
914
915         if (!cpu_isset(cpu, buffer->cpumask))
916                 goto out;
917
918         cpu_buffer = buffer->buffers[cpu];
919
920         if (atomic_read(&cpu_buffer->record_disabled))
921                 goto out;
922
923         length = rb_calculate_event_length(length);
924         if (length > BUF_PAGE_SIZE)
925                 return NULL;
926
927         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
928         if (!event)
929                 goto out;
930
931         return event;
932
933  out:
934         local_irq_restore(*flags);
935         return NULL;
936 }
937
938 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
939                       struct ring_buffer_event *event)
940 {
941         cpu_buffer->tail_page->write += rb_event_length(event);
942         cpu_buffer->tail_page->size = cpu_buffer->tail_page->write;
943         cpu_buffer->write_stamp += event->time_delta;
944         cpu_buffer->entries++;
945 }
946
947 /**
948  * ring_buffer_unlock_commit - commit a reserved
949  * @buffer: The buffer to commit to
950  * @event: The event pointer to commit.
951  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
952  *
953  * This commits the data to the ring buffer, and releases any locks held.
954  *
955  * Must be paired with ring_buffer_lock_reserve.
956  */
957 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
958                               struct ring_buffer_event *event,
959                               unsigned long flags)
960 {
961         struct ring_buffer_per_cpu *cpu_buffer;
962         int cpu = raw_smp_processor_id();
963
964         cpu_buffer = buffer->buffers[cpu];
965
966         rb_commit(cpu_buffer, event);
967
968         local_irq_restore(flags);
969
970         return 0;
971 }
972
973 /**
974  * ring_buffer_write - write data to the buffer without reserving
975  * @buffer: The ring buffer to write to.
976  * @length: The length of the data being written (excluding the event header)
977  * @data: The data to write to the buffer.
978  *
979  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
980  * one function. If you already have the data to write to the buffer, it
981  * may be easier to simply call this function.
982  *
983  * Note, like ring_buffer_lock_reserve, the length is the length of the data
984  * and not the length of the event which would hold the header.
985  */
986 int ring_buffer_write(struct ring_buffer *buffer,
987                         unsigned long length,
988                         void *data)
989 {
990         struct ring_buffer_per_cpu *cpu_buffer;
991         struct ring_buffer_event *event;
992         unsigned long event_length, flags;
993         void *body;
994         int ret = -EBUSY;
995         int cpu;
996
997         if (atomic_read(&buffer->record_disabled))
998                 return -EBUSY;
999
1000         local_irq_save(flags);
1001         cpu = raw_smp_processor_id();
1002
1003         if (!cpu_isset(cpu, buffer->cpumask))
1004                 goto out;
1005
1006         cpu_buffer = buffer->buffers[cpu];
1007
1008         if (atomic_read(&cpu_buffer->record_disabled))
1009                 goto out;
1010
1011         event_length = rb_calculate_event_length(length);
1012         event = rb_reserve_next_event(cpu_buffer,
1013                                       RINGBUF_TYPE_DATA, event_length);
1014         if (!event)
1015                 goto out;
1016
1017         body = rb_event_data(event);
1018
1019         memcpy(body, data, length);
1020
1021         rb_commit(cpu_buffer, event);
1022
1023         ret = 0;
1024  out:
1025         local_irq_restore(flags);
1026
1027         return ret;
1028 }
1029
1030 /**
1031  * ring_buffer_record_disable - stop all writes into the buffer
1032  * @buffer: The ring buffer to stop writes to.
1033  *
1034  * This prevents all writes to the buffer. Any attempt to write
1035  * to the buffer after this will fail and return NULL.
1036  *
1037  * The caller should call synchronize_sched() after this.
1038  */
1039 void ring_buffer_record_disable(struct ring_buffer *buffer)
1040 {
1041         atomic_inc(&buffer->record_disabled);
1042 }
1043
1044 /**
1045  * ring_buffer_record_enable - enable writes to the buffer
1046  * @buffer: The ring buffer to enable writes
1047  *
1048  * Note, multiple disables will need the same number of enables
1049  * to truely enable the writing (much like preempt_disable).
1050  */
1051 void ring_buffer_record_enable(struct ring_buffer *buffer)
1052 {
1053         atomic_dec(&buffer->record_disabled);
1054 }
1055
1056 /**
1057  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1058  * @buffer: The ring buffer to stop writes to.
1059  * @cpu: The CPU buffer to stop
1060  *
1061  * This prevents all writes to the buffer. Any attempt to write
1062  * to the buffer after this will fail and return NULL.
1063  *
1064  * The caller should call synchronize_sched() after this.
1065  */
1066 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1067 {
1068         struct ring_buffer_per_cpu *cpu_buffer;
1069
1070         if (!cpu_isset(cpu, buffer->cpumask))
1071                 return;
1072
1073         cpu_buffer = buffer->buffers[cpu];
1074         atomic_inc(&cpu_buffer->record_disabled);
1075 }
1076
1077 /**
1078  * ring_buffer_record_enable_cpu - enable writes to the buffer
1079  * @buffer: The ring buffer to enable writes
1080  * @cpu: The CPU to enable.
1081  *
1082  * Note, multiple disables will need the same number of enables
1083  * to truely enable the writing (much like preempt_disable).
1084  */
1085 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1086 {
1087         struct ring_buffer_per_cpu *cpu_buffer;
1088
1089         if (!cpu_isset(cpu, buffer->cpumask))
1090                 return;
1091
1092         cpu_buffer = buffer->buffers[cpu];
1093         atomic_dec(&cpu_buffer->record_disabled);
1094 }
1095
1096 /**
1097  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1098  * @buffer: The ring buffer
1099  * @cpu: The per CPU buffer to get the entries from.
1100  */
1101 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1102 {
1103         struct ring_buffer_per_cpu *cpu_buffer;
1104
1105         if (!cpu_isset(cpu, buffer->cpumask))
1106                 return 0;
1107
1108         cpu_buffer = buffer->buffers[cpu];
1109         return cpu_buffer->entries;
1110 }
1111
1112 /**
1113  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1114  * @buffer: The ring buffer
1115  * @cpu: The per CPU buffer to get the number of overruns from
1116  */
1117 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1118 {
1119         struct ring_buffer_per_cpu *cpu_buffer;
1120
1121         if (!cpu_isset(cpu, buffer->cpumask))
1122                 return 0;
1123
1124         cpu_buffer = buffer->buffers[cpu];
1125         return cpu_buffer->overrun;
1126 }
1127
1128 /**
1129  * ring_buffer_entries - get the number of entries in a buffer
1130  * @buffer: The ring buffer
1131  *
1132  * Returns the total number of entries in the ring buffer
1133  * (all CPU entries)
1134  */
1135 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1136 {
1137         struct ring_buffer_per_cpu *cpu_buffer;
1138         unsigned long entries = 0;
1139         int cpu;
1140
1141         /* if you care about this being correct, lock the buffer */
1142         for_each_buffer_cpu(buffer, cpu) {
1143                 cpu_buffer = buffer->buffers[cpu];
1144                 entries += cpu_buffer->entries;
1145         }
1146
1147         return entries;
1148 }
1149
1150 /**
1151  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1152  * @buffer: The ring buffer
1153  *
1154  * Returns the total number of overruns in the ring buffer
1155  * (all CPU entries)
1156  */
1157 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1158 {
1159         struct ring_buffer_per_cpu *cpu_buffer;
1160         unsigned long overruns = 0;
1161         int cpu;
1162
1163         /* if you care about this being correct, lock the buffer */
1164         for_each_buffer_cpu(buffer, cpu) {
1165                 cpu_buffer = buffer->buffers[cpu];
1166                 overruns += cpu_buffer->overrun;
1167         }
1168
1169         return overruns;
1170 }
1171
1172 /**
1173  * ring_buffer_iter_reset - reset an iterator
1174  * @iter: The iterator to reset
1175  *
1176  * Resets the iterator, so that it will start from the beginning
1177  * again.
1178  */
1179 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1180 {
1181         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1182
1183         /* Iterator usage is expected to have record disabled */
1184         if (list_empty(&cpu_buffer->reader_page->list)) {
1185                 iter->head_page = cpu_buffer->head_page;
1186                 iter->head = cpu_buffer->head_page->read;
1187         } else {
1188                 iter->head_page = cpu_buffer->reader_page;
1189                 iter->head = cpu_buffer->reader_page->read;
1190         }
1191         if (iter->head)
1192                 iter->read_stamp = cpu_buffer->read_stamp;
1193         else
1194                 iter->read_stamp = iter->head_page->time_stamp;
1195 }
1196
1197 /**
1198  * ring_buffer_iter_empty - check if an iterator has no more to read
1199  * @iter: The iterator to check
1200  */
1201 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1202 {
1203         struct ring_buffer_per_cpu *cpu_buffer;
1204
1205         cpu_buffer = iter->cpu_buffer;
1206
1207         return iter->head_page == cpu_buffer->tail_page &&
1208                 iter->head == cpu_buffer->tail_page->write;
1209 }
1210
1211 static void
1212 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1213                      struct ring_buffer_event *event)
1214 {
1215         u64 delta;
1216
1217         switch (event->type) {
1218         case RINGBUF_TYPE_PADDING:
1219                 return;
1220
1221         case RINGBUF_TYPE_TIME_EXTEND:
1222                 delta = event->array[0];
1223                 delta <<= TS_SHIFT;
1224                 delta += event->time_delta;
1225                 cpu_buffer->read_stamp += delta;
1226                 return;
1227
1228         case RINGBUF_TYPE_TIME_STAMP:
1229                 /* FIXME: not implemented */
1230                 return;
1231
1232         case RINGBUF_TYPE_DATA:
1233                 cpu_buffer->read_stamp += event->time_delta;
1234                 return;
1235
1236         default:
1237                 BUG();
1238         }
1239         return;
1240 }
1241
1242 static void
1243 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1244                           struct ring_buffer_event *event)
1245 {
1246         u64 delta;
1247
1248         switch (event->type) {
1249         case RINGBUF_TYPE_PADDING:
1250                 return;
1251
1252         case RINGBUF_TYPE_TIME_EXTEND:
1253                 delta = event->array[0];
1254                 delta <<= TS_SHIFT;
1255                 delta += event->time_delta;
1256                 iter->read_stamp += delta;
1257                 return;
1258
1259         case RINGBUF_TYPE_TIME_STAMP:
1260                 /* FIXME: not implemented */
1261                 return;
1262
1263         case RINGBUF_TYPE_DATA:
1264                 iter->read_stamp += event->time_delta;
1265                 return;
1266
1267         default:
1268                 BUG();
1269         }
1270         return;
1271 }
1272
1273 static struct buffer_page *
1274 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1275 {
1276         struct buffer_page *reader = NULL;
1277         unsigned long flags;
1278
1279         spin_lock_irqsave(&cpu_buffer->lock, flags);
1280
1281  again:
1282         reader = cpu_buffer->reader_page;
1283
1284         /* If there's more to read, return this page */
1285         if (cpu_buffer->reader_page->read < reader->size)
1286                 goto out;
1287
1288         /* Never should we have an index greater than the size */
1289         WARN_ON(cpu_buffer->reader_page->read > reader->size);
1290
1291         /* check if we caught up to the tail */
1292         reader = NULL;
1293         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1294                 goto out;
1295
1296         /*
1297          * Splice the empty reader page into the list around the head.
1298          * Reset the reader page to size zero.
1299          */
1300
1301         reader = cpu_buffer->head_page;
1302         cpu_buffer->reader_page->list.next = reader->list.next;
1303         cpu_buffer->reader_page->list.prev = reader->list.prev;
1304         cpu_buffer->reader_page->size = 0;
1305
1306         /* Make the reader page now replace the head */
1307         reader->list.prev->next = &cpu_buffer->reader_page->list;
1308         reader->list.next->prev = &cpu_buffer->reader_page->list;
1309
1310         /*
1311          * If the tail is on the reader, then we must set the head
1312          * to the inserted page, otherwise we set it one before.
1313          */
1314         cpu_buffer->head_page = cpu_buffer->reader_page;
1315
1316         if (cpu_buffer->tail_page != reader)
1317                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1318
1319         /* Finally update the reader page to the new head */
1320         cpu_buffer->reader_page = reader;
1321         rb_reset_reader_page(cpu_buffer);
1322
1323         goto again;
1324
1325  out:
1326         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1327
1328         return reader;
1329 }
1330
1331 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1332 {
1333         struct ring_buffer_event *event;
1334         struct buffer_page *reader;
1335         unsigned length;
1336
1337         reader = rb_get_reader_page(cpu_buffer);
1338
1339         /* This function should not be called when buffer is empty */
1340         BUG_ON(!reader);
1341
1342         event = rb_reader_event(cpu_buffer);
1343
1344         if (event->type == RINGBUF_TYPE_DATA)
1345                 cpu_buffer->entries--;
1346
1347         rb_update_read_stamp(cpu_buffer, event);
1348
1349         length = rb_event_length(event);
1350         cpu_buffer->reader_page->read += length;
1351 }
1352
1353 static void rb_advance_iter(struct ring_buffer_iter *iter)
1354 {
1355         struct ring_buffer *buffer;
1356         struct ring_buffer_per_cpu *cpu_buffer;
1357         struct ring_buffer_event *event;
1358         unsigned length;
1359
1360         cpu_buffer = iter->cpu_buffer;
1361         buffer = cpu_buffer->buffer;
1362
1363         /*
1364          * Check if we are at the end of the buffer.
1365          */
1366         if (iter->head >= iter->head_page->size) {
1367                 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1368                 rb_inc_iter(iter);
1369                 return;
1370         }
1371
1372         event = rb_iter_head_event(iter);
1373
1374         length = rb_event_length(event);
1375
1376         /*
1377          * This should not be called to advance the header if we are
1378          * at the tail of the buffer.
1379          */
1380         BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1381                (iter->head + length > cpu_buffer->tail_page->write));
1382
1383         rb_update_iter_read_stamp(iter, event);
1384
1385         iter->head += length;
1386
1387         /* check for end of page padding */
1388         if ((iter->head >= iter->head_page->size) &&
1389             (iter->head_page != cpu_buffer->tail_page))
1390                 rb_advance_iter(iter);
1391 }
1392
1393 /**
1394  * ring_buffer_peek - peek at the next event to be read
1395  * @buffer: The ring buffer to read
1396  * @cpu: The cpu to peak at
1397  * @ts: The timestamp counter of this event.
1398  *
1399  * This will return the event that will be read next, but does
1400  * not consume the data.
1401  */
1402 struct ring_buffer_event *
1403 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1404 {
1405         struct ring_buffer_per_cpu *cpu_buffer;
1406         struct ring_buffer_event *event;
1407         struct buffer_page *reader;
1408
1409         if (!cpu_isset(cpu, buffer->cpumask))
1410                 return NULL;
1411
1412         cpu_buffer = buffer->buffers[cpu];
1413
1414  again:
1415         reader = rb_get_reader_page(cpu_buffer);
1416         if (!reader)
1417                 return NULL;
1418
1419         event = rb_reader_event(cpu_buffer);
1420
1421         switch (event->type) {
1422         case RINGBUF_TYPE_PADDING:
1423                 WARN_ON(1);
1424                 rb_advance_reader(cpu_buffer);
1425                 return NULL;
1426
1427         case RINGBUF_TYPE_TIME_EXTEND:
1428                 /* Internal data, OK to advance */
1429                 rb_advance_reader(cpu_buffer);
1430                 goto again;
1431
1432         case RINGBUF_TYPE_TIME_STAMP:
1433                 /* FIXME: not implemented */
1434                 rb_advance_reader(cpu_buffer);
1435                 goto again;
1436
1437         case RINGBUF_TYPE_DATA:
1438                 if (ts) {
1439                         *ts = cpu_buffer->read_stamp + event->time_delta;
1440                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1441                 }
1442                 return event;
1443
1444         default:
1445                 BUG();
1446         }
1447
1448         return NULL;
1449 }
1450
1451 /**
1452  * ring_buffer_iter_peek - peek at the next event to be read
1453  * @iter: The ring buffer iterator
1454  * @ts: The timestamp counter of this event.
1455  *
1456  * This will return the event that will be read next, but does
1457  * not increment the iterator.
1458  */
1459 struct ring_buffer_event *
1460 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1461 {
1462         struct ring_buffer *buffer;
1463         struct ring_buffer_per_cpu *cpu_buffer;
1464         struct ring_buffer_event *event;
1465
1466         if (ring_buffer_iter_empty(iter))
1467                 return NULL;
1468
1469         cpu_buffer = iter->cpu_buffer;
1470         buffer = cpu_buffer->buffer;
1471
1472  again:
1473         if (rb_per_cpu_empty(cpu_buffer))
1474                 return NULL;
1475
1476         event = rb_iter_head_event(iter);
1477
1478         switch (event->type) {
1479         case RINGBUF_TYPE_PADDING:
1480                 rb_inc_iter(iter);
1481                 goto again;
1482
1483         case RINGBUF_TYPE_TIME_EXTEND:
1484                 /* Internal data, OK to advance */
1485                 rb_advance_iter(iter);
1486                 goto again;
1487
1488         case RINGBUF_TYPE_TIME_STAMP:
1489                 /* FIXME: not implemented */
1490                 rb_advance_iter(iter);
1491                 goto again;
1492
1493         case RINGBUF_TYPE_DATA:
1494                 if (ts) {
1495                         *ts = iter->read_stamp + event->time_delta;
1496                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1497                 }
1498                 return event;
1499
1500         default:
1501                 BUG();
1502         }
1503
1504         return NULL;
1505 }
1506
1507 /**
1508  * ring_buffer_consume - return an event and consume it
1509  * @buffer: The ring buffer to get the next event from
1510  *
1511  * Returns the next event in the ring buffer, and that event is consumed.
1512  * Meaning, that sequential reads will keep returning a different event,
1513  * and eventually empty the ring buffer if the producer is slower.
1514  */
1515 struct ring_buffer_event *
1516 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1517 {
1518         struct ring_buffer_per_cpu *cpu_buffer;
1519         struct ring_buffer_event *event;
1520
1521         if (!cpu_isset(cpu, buffer->cpumask))
1522                 return NULL;
1523
1524         event = ring_buffer_peek(buffer, cpu, ts);
1525         if (!event)
1526                 return NULL;
1527
1528         cpu_buffer = buffer->buffers[cpu];
1529         rb_advance_reader(cpu_buffer);
1530
1531         return event;
1532 }
1533
1534 /**
1535  * ring_buffer_read_start - start a non consuming read of the buffer
1536  * @buffer: The ring buffer to read from
1537  * @cpu: The cpu buffer to iterate over
1538  *
1539  * This starts up an iteration through the buffer. It also disables
1540  * the recording to the buffer until the reading is finished.
1541  * This prevents the reading from being corrupted. This is not
1542  * a consuming read, so a producer is not expected.
1543  *
1544  * Must be paired with ring_buffer_finish.
1545  */
1546 struct ring_buffer_iter *
1547 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1548 {
1549         struct ring_buffer_per_cpu *cpu_buffer;
1550         struct ring_buffer_iter *iter;
1551         unsigned long flags;
1552
1553         if (!cpu_isset(cpu, buffer->cpumask))
1554                 return NULL;
1555
1556         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1557         if (!iter)
1558                 return NULL;
1559
1560         cpu_buffer = buffer->buffers[cpu];
1561
1562         iter->cpu_buffer = cpu_buffer;
1563
1564         atomic_inc(&cpu_buffer->record_disabled);
1565         synchronize_sched();
1566
1567         spin_lock_irqsave(&cpu_buffer->lock, flags);
1568         ring_buffer_iter_reset(iter);
1569         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1570
1571         return iter;
1572 }
1573
1574 /**
1575  * ring_buffer_finish - finish reading the iterator of the buffer
1576  * @iter: The iterator retrieved by ring_buffer_start
1577  *
1578  * This re-enables the recording to the buffer, and frees the
1579  * iterator.
1580  */
1581 void
1582 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1583 {
1584         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1585
1586         atomic_dec(&cpu_buffer->record_disabled);
1587         kfree(iter);
1588 }
1589
1590 /**
1591  * ring_buffer_read - read the next item in the ring buffer by the iterator
1592  * @iter: The ring buffer iterator
1593  * @ts: The time stamp of the event read.
1594  *
1595  * This reads the next event in the ring buffer and increments the iterator.
1596  */
1597 struct ring_buffer_event *
1598 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1599 {
1600         struct ring_buffer_event *event;
1601
1602         event = ring_buffer_iter_peek(iter, ts);
1603         if (!event)
1604                 return NULL;
1605
1606         rb_advance_iter(iter);
1607
1608         return event;
1609 }
1610
1611 /**
1612  * ring_buffer_size - return the size of the ring buffer (in bytes)
1613  * @buffer: The ring buffer.
1614  */
1615 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1616 {
1617         return BUF_PAGE_SIZE * buffer->pages;
1618 }
1619
1620 static void
1621 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1622 {
1623         cpu_buffer->head_page
1624                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1625         cpu_buffer->head_page->size = 0;
1626         cpu_buffer->tail_page = cpu_buffer->head_page;
1627         cpu_buffer->tail_page->size = 0;
1628         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1629         cpu_buffer->reader_page->size = 0;
1630
1631         cpu_buffer->head_page->read = 0;
1632         cpu_buffer->tail_page->write = 0;
1633         cpu_buffer->reader_page->read = 0;
1634
1635         cpu_buffer->overrun = 0;
1636         cpu_buffer->entries = 0;
1637 }
1638
1639 /**
1640  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1641  * @buffer: The ring buffer to reset a per cpu buffer of
1642  * @cpu: The CPU buffer to be reset
1643  */
1644 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1645 {
1646         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1647         unsigned long flags;
1648
1649         if (!cpu_isset(cpu, buffer->cpumask))
1650                 return;
1651
1652         spin_lock_irqsave(&cpu_buffer->lock, flags);
1653
1654         rb_reset_cpu(cpu_buffer);
1655
1656         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1657 }
1658
1659 /**
1660  * ring_buffer_reset - reset a ring buffer
1661  * @buffer: The ring buffer to reset all cpu buffers
1662  */
1663 void ring_buffer_reset(struct ring_buffer *buffer)
1664 {
1665         int cpu;
1666
1667         for_each_buffer_cpu(buffer, cpu)
1668                 ring_buffer_reset_cpu(buffer, cpu);
1669 }
1670
1671 /**
1672  * rind_buffer_empty - is the ring buffer empty?
1673  * @buffer: The ring buffer to test
1674  */
1675 int ring_buffer_empty(struct ring_buffer *buffer)
1676 {
1677         struct ring_buffer_per_cpu *cpu_buffer;
1678         int cpu;
1679
1680         /* yes this is racy, but if you don't like the race, lock the buffer */
1681         for_each_buffer_cpu(buffer, cpu) {
1682                 cpu_buffer = buffer->buffers[cpu];
1683                 if (!rb_per_cpu_empty(cpu_buffer))
1684                         return 0;
1685         }
1686         return 1;
1687 }
1688
1689 /**
1690  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1691  * @buffer: The ring buffer
1692  * @cpu: The CPU buffer to test
1693  */
1694 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1695 {
1696         struct ring_buffer_per_cpu *cpu_buffer;
1697
1698         if (!cpu_isset(cpu, buffer->cpumask))
1699                 return 1;
1700
1701         cpu_buffer = buffer->buffers[cpu];
1702         return rb_per_cpu_empty(cpu_buffer);
1703 }
1704
1705 /**
1706  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1707  * @buffer_a: One buffer to swap with
1708  * @buffer_b: The other buffer to swap with
1709  *
1710  * This function is useful for tracers that want to take a "snapshot"
1711  * of a CPU buffer and has another back up buffer lying around.
1712  * it is expected that the tracer handles the cpu buffer not being
1713  * used at the moment.
1714  */
1715 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1716                          struct ring_buffer *buffer_b, int cpu)
1717 {
1718         struct ring_buffer_per_cpu *cpu_buffer_a;
1719         struct ring_buffer_per_cpu *cpu_buffer_b;
1720
1721         if (!cpu_isset(cpu, buffer_a->cpumask) ||
1722             !cpu_isset(cpu, buffer_b->cpumask))
1723                 return -EINVAL;
1724
1725         /* At least make sure the two buffers are somewhat the same */
1726         if (buffer_a->size != buffer_b->size ||
1727             buffer_a->pages != buffer_b->pages)
1728                 return -EINVAL;
1729
1730         cpu_buffer_a = buffer_a->buffers[cpu];
1731         cpu_buffer_b = buffer_b->buffers[cpu];
1732
1733         /*
1734          * We can't do a synchronize_sched here because this
1735          * function can be called in atomic context.
1736          * Normally this will be called from the same CPU as cpu.
1737          * If not it's up to the caller to protect this.
1738          */
1739         atomic_inc(&cpu_buffer_a->record_disabled);
1740         atomic_inc(&cpu_buffer_b->record_disabled);
1741
1742         buffer_a->buffers[cpu] = cpu_buffer_b;
1743         buffer_b->buffers[cpu] = cpu_buffer_a;
1744
1745         cpu_buffer_b->buffer = buffer_a;
1746         cpu_buffer_a->buffer = buffer_b;
1747
1748         atomic_dec(&cpu_buffer_a->record_disabled);
1749         atomic_dec(&cpu_buffer_b->record_disabled);
1750
1751         return 0;
1752 }
1753