#include <linux/init.h>
 #include <linux/hash.h>
 #include <linux/list.h>
+#include <linux/cpu.h>
 #include <linux/fs.h>
 
 #include "trace.h"
        struct mutex                    mutex;
 
        struct ring_buffer_per_cpu      **buffers;
+
+#ifdef CONFIG_HOTPLUG
+       struct notifier_block           cpu_notify;
+#endif
 };
 
 struct ring_buffer_iter {
  */
 extern int ring_buffer_page_too_big(void);
 
+#ifdef CONFIG_HOTPLUG
+static int __cpuinit rb_cpu_notify(struct notifier_block *self,
+                                  unsigned long action, void *hcpu);
+#endif
+
 /**
  * ring_buffer_alloc - allocate a new ring_buffer
  * @size: the size in bytes per cpu that is needed.
        if (buffer->pages == 1)
                buffer->pages++;
 
-       cpumask_copy(buffer->cpumask, cpu_possible_mask);
+       get_online_cpus();
+       cpumask_copy(buffer->cpumask, cpu_online_mask);
        buffer->cpus = nr_cpu_ids;
 
        bsize = sizeof(void *) * nr_cpu_ids;
                        goto fail_free_buffers;
        }
 
+#ifdef CONFIG_HOTPLUG
+       buffer->cpu_notify.notifier_call = rb_cpu_notify;
+       buffer->cpu_notify.priority = 0;
+       register_cpu_notifier(&buffer->cpu_notify);
+#endif
+
+       put_online_cpus();
        mutex_init(&buffer->mutex);
 
        return buffer;
 
  fail_free_cpumask:
        free_cpumask_var(buffer->cpumask);
+       put_online_cpus();
 
  fail_free_buffer:
        kfree(buffer);
 {
        int cpu;
 
+       get_online_cpus();
+
+#ifdef CONFIG_HOTPLUG
+       unregister_cpu_notifier(&buffer->cpu_notify);
+#endif
+
        for_each_buffer_cpu(buffer, cpu)
                rb_free_cpu_buffer(buffer->buffers[cpu]);
 
+       put_online_cpus();
+
        free_cpumask_var(buffer->cpumask);
 
        kfree(buffer);
                return size;
 
        mutex_lock(&buffer->mutex);
+       get_online_cpus();
 
        nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
 
        if (size < buffer_size) {
 
                /* easy case, just free pages */
-               if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
-                       mutex_unlock(&buffer->mutex);
-                       return -1;
-               }
+               if (RB_WARN_ON(buffer, nr_pages >= buffer->pages))
+                       goto out_fail;
 
                rm_pages = buffer->pages - nr_pages;
 
         * add these pages to the cpu_buffers. Otherwise we just free
         * them all and return -ENOMEM;
         */
-       if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
-               mutex_unlock(&buffer->mutex);
-               return -1;
-       }
+       if (RB_WARN_ON(buffer, nr_pages <= buffer->pages))
+               goto out_fail;
 
        new_pages = nr_pages - buffer->pages;
 
                rb_insert_pages(cpu_buffer, &pages, new_pages);
        }
 
-       if (RB_WARN_ON(buffer, !list_empty(&pages))) {
-               mutex_unlock(&buffer->mutex);
-               return -1;
-       }
+       if (RB_WARN_ON(buffer, !list_empty(&pages)))
+               goto out_fail;
 
  out:
        buffer->pages = nr_pages;
+       put_online_cpus();
        mutex_unlock(&buffer->mutex);
 
        return size;
                list_del_init(&bpage->list);
                free_buffer_page(bpage);
        }
+       put_online_cpus();
        mutex_unlock(&buffer->mutex);
        return -ENOMEM;
+
+       /*
+        * Something went totally wrong, and we are too paranoid
+        * to even clean up the mess.
+        */
+ out_fail:
+       put_online_cpus();
+       mutex_unlock(&buffer->mutex);
+       return -1;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_resize);
 
 {
        struct ring_buffer_per_cpu *cpu_buffer;
 
+       get_online_cpus();
+
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
        atomic_inc(&cpu_buffer->record_disabled);
+ out:
+       put_online_cpus();
 }
 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
 
 {
        struct ring_buffer_per_cpu *cpu_buffer;
 
+       get_online_cpus();
+
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
        atomic_dec(&cpu_buffer->record_disabled);
+ out:
+       put_online_cpus();
 }
 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
 
 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
+       unsigned long ret = 0;
+
+       get_online_cpus();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return 0;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
-       return cpu_buffer->entries;
+       ret = cpu_buffer->entries;
+ out:
+       put_online_cpus();
+
+       return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
 
 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
+       unsigned long ret = 0;
+
+       get_online_cpus();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return 0;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
-       return cpu_buffer->overrun;
+       ret = cpu_buffer->overrun;
+ out:
+       put_online_cpus();
+
+       return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
 
        unsigned long entries = 0;
        int cpu;
 
+       get_online_cpus();
+
        /* if you care about this being correct, lock the buffer */
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
                entries += cpu_buffer->entries;
        }
 
+       put_online_cpus();
+
        return entries;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_entries);
        unsigned long overruns = 0;
        int cpu;
 
+       get_online_cpus();
+
        /* if you care about this being correct, lock the buffer */
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
                overruns += cpu_buffer->overrun;
        }
 
+       put_online_cpus();
+
        return overruns;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
  */
 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
 {
-       struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
+       struct ring_buffer_per_cpu *cpu_buffer;
        unsigned long flags;
 
+       if (!iter)
+               return;
+
+       cpu_buffer = iter->cpu_buffer;
+
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
        rb_iter_reset(iter);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
        struct buffer_page *reader;
        int nr_loops = 0;
 
-       if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return NULL;
-
        cpu_buffer = buffer->buffers[cpu];
 
  again:
 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
 {
        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
-       struct ring_buffer_event *event;
+       struct ring_buffer_event *event = NULL;
        unsigned long flags;
 
+       get_online_cpus();
+
+       if (!cpumask_test_cpu(cpu, buffer->cpumask))
+               goto out;
+
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
        event = rb_buffer_peek(buffer, cpu, ts);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+ out:
+       put_online_cpus();
+
        return event;
 }
 
 struct ring_buffer_event *
 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
 {
-       struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
-       struct ring_buffer_event *event;
+       struct ring_buffer_per_cpu *cpu_buffer;
+       struct ring_buffer_event *event = NULL;
        unsigned long flags;
 
+       /* might be called in atomic */
+       preempt_disable();
+
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return NULL;
+               goto out;
 
+       cpu_buffer = buffer->buffers[cpu];
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
        event = rb_buffer_peek(buffer, cpu, ts);
        if (!event)
-               goto out;
+               goto out_unlock;
 
        rb_advance_reader(cpu_buffer);
 
- out:
+ out_unlock:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+ out:
+       preempt_enable();
+
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_consume);
 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
-       struct ring_buffer_iter *iter;
+       struct ring_buffer_iter *iter = NULL;
        unsigned long flags;
 
+       get_online_cpus();
+
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return NULL;
+               goto out;
 
        iter = kmalloc(sizeof(*iter), GFP_KERNEL);
        if (!iter)
-               return NULL;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
 
        __raw_spin_unlock(&cpu_buffer->lock);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+ out:
+       put_online_cpus();
+
        return iter;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
 {
        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
        unsigned long flags;
+       int resched;
+
+       /* Can't use get_online_cpus because this can be in atomic */
+       resched = ftrace_preempt_disable();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return;
+               goto out;
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
        __raw_spin_unlock(&cpu_buffer->lock);
 
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+ out:
+       ftrace_preempt_enable(resched);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
 
  */
 void ring_buffer_reset(struct ring_buffer *buffer)
 {
+       int resched;
        int cpu;
 
+       /* Can't use get_online_cpus because this can be in atomic */
+       resched = ftrace_preempt_disable();
+
        for_each_buffer_cpu(buffer, cpu)
                ring_buffer_reset_cpu(buffer, cpu);
+
+       ftrace_preempt_enable(resched);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_reset);
 
        struct ring_buffer_per_cpu *cpu_buffer;
        int cpu;
 
+       get_online_cpus();
+
        /* yes this is racy, but if you don't like the race, lock the buffer */
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
                if (!rb_per_cpu_empty(cpu_buffer))
                        return 0;
        }
+
+       put_online_cpus();
+
        return 1;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_empty);
 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
+       int ret = 1;
+
+       get_online_cpus();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return 1;
+               goto out;
 
        cpu_buffer = buffer->buffers[cpu];
-       return rb_per_cpu_empty(cpu_buffer);
+       ret = rb_per_cpu_empty(cpu_buffer);
+
+ out:
+       put_online_cpus();
+
+       return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
 
 {
        struct ring_buffer_per_cpu *cpu_buffer_a;
        struct ring_buffer_per_cpu *cpu_buffer_b;
+       int ret = -EINVAL;
+
+       get_online_cpus();
 
        if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
            !cpumask_test_cpu(cpu, buffer_b->cpumask))
-               return -EINVAL;
+               goto out;
 
        /* At least make sure the two buffers are somewhat the same */
        if (buffer_a->pages != buffer_b->pages)
-               return -EINVAL;
+               goto out;
+
+       ret = -EAGAIN;
 
        if (ring_buffer_flags != RB_BUFFERS_ON)
-               return -EAGAIN;
+               goto out;
 
        if (atomic_read(&buffer_a->record_disabled))
-               return -EAGAIN;
+               goto out;
 
        if (atomic_read(&buffer_b->record_disabled))
-               return -EAGAIN;
+               goto out;
 
        cpu_buffer_a = buffer_a->buffers[cpu];
        cpu_buffer_b = buffer_b->buffers[cpu];
 
        if (atomic_read(&cpu_buffer_a->record_disabled))
-               return -EAGAIN;
+               goto out;
 
        if (atomic_read(&cpu_buffer_b->record_disabled))
-               return -EAGAIN;
+               goto out;
 
        /*
         * We can't do a synchronize_sched here because this
        atomic_dec(&cpu_buffer_a->record_disabled);
        atomic_dec(&cpu_buffer_b->record_disabled);
 
-       return 0;
+       ret = 0;
+out:
+       put_online_cpus();
+
+       return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
 
        u64 save_timestamp;
        int ret = -1;
 
+       get_online_cpus();
+
+       if (!cpumask_test_cpu(cpu, buffer->cpumask))
+               goto out;
+
        /*
         * If len is not big enough to hold the page header, then
         * we can not copy anything.
         */
        if (len <= BUF_PAGE_HDR_SIZE)
-               return -1;
+               goto out;
 
        len -= BUF_PAGE_HDR_SIZE;
 
        if (!data_page)
-               return -1;
+               goto out;
 
        bpage = *data_page;
        if (!bpage)
-               return -1;
+               goto out;
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
        reader = rb_get_reader_page(cpu_buffer);
        if (!reader)
-               goto out;
+               goto out_unlock;
 
        event = rb_reader_event(cpu_buffer);
 
                unsigned int size;
 
                if (full)
-                       goto out;
+                       goto out_unlock;
 
                if (len > (commit - read))
                        len = (commit - read);
                size = rb_event_length(event);
 
                if (len < size)
-                       goto out;
+                       goto out_unlock;
 
                /* save the current timestamp, since the user will need it */
                save_timestamp = cpu_buffer->read_stamp;
        }
        ret = read;
 
- out:
+ out_unlock:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+ out:
+       put_online_cpus();
+
        return ret;
 }
 
 }
 
 fs_initcall(rb_init_debugfs);
+
+#ifdef CONFIG_HOTPLUG
+static int __cpuinit rb_cpu_notify(struct notifier_block *self,
+                                  unsigned long action, void *hcpu)
+{
+       struct ring_buffer *buffer =
+               container_of(self, struct ring_buffer, cpu_notify);
+       long cpu = (long)hcpu;
+
+       switch (action) {
+       case CPU_UP_PREPARE:
+       case CPU_UP_PREPARE_FROZEN:
+               if (cpu_isset(cpu, *buffer->cpumask))
+                       return NOTIFY_OK;
+
+               buffer->buffers[cpu] =
+                       rb_allocate_cpu_buffer(buffer, cpu);
+               if (!buffer->buffers[cpu]) {
+                       WARN(1, "failed to allocate ring buffer on CPU %ld\n",
+                            cpu);
+                       return NOTIFY_OK;
+               }
+               smp_wmb();
+               cpu_set(cpu, *buffer->cpumask);
+               break;
+       case CPU_DOWN_PREPARE:
+       case CPU_DOWN_PREPARE_FROZEN:
+               /*
+                * Do nothing.
+                *  If we were to free the buffer, then the user would
+                *  lose any trace that was in the buffer.
+                */
+               break;
+       default:
+               break;
+       }
+       return NOTIFY_OK;
+}
+#endif