New name for the ramdisk parameter.
                        See Documentation/ramdisk.txt.
 
+       rcu.blimit=     [KNL,BOOT] Set maximum number of finished
+                       RCU callbacks to process in one batch.
+
+       rcu.qhimark=    [KNL,BOOT] Set threshold of queued
+                       RCU callbacks over which batch limiting is disabled.
+
+       rcu.qlowmark=   [KNL,BOOT] Set threshold of queued
+                       RCU callbacks below which batch limiting is re-enabled.
+
+       rcu.rsinterval= [KNL,BOOT,SMP] Set the number of additional
+                       RCU callbacks to queued before forcing reschedule
+                       on all cpus.
+
        rdinit=         [KNL]
                        Format: <full_path>
                        Run specified binary instead of /init from the ramdisk,
 
        long            batch;           /* Batch # for current RCU batch */
        struct rcu_head *nxtlist;
        struct rcu_head **nxttail;
-       long            count; /* # of queued items */
+       long            qlen;            /* # of queued callbacks */
        struct rcu_head *curlist;
        struct rcu_head **curtail;
        struct rcu_head *donelist;
        struct rcu_head **donetail;
+       long            blimit;          /* Upper limit on a processed batch */
        int cpu;
        struct rcu_head barrier;
+#ifdef CONFIG_SMP
+       long            last_rs_qlen;    /* qlen during the last resched */
+#endif
 };
 
 DECLARE_PER_CPU(struct rcu_data, rcu_data);
 
 
 /* Fake initialization required by compiler */
 static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
-static int maxbatch = 10000;
+static int blimit = 10;
+static int qhimark = 10000;
+static int qlowmark = 100;
+#ifdef CONFIG_SMP
+static int rsinterval = 1000;
+#endif
+
+static atomic_t rcu_barrier_cpu_count;
+static struct semaphore rcu_barrier_sema;
+static struct completion rcu_barrier_completion;
+
+#ifdef CONFIG_SMP
+static void force_quiescent_state(struct rcu_data *rdp,
+                       struct rcu_ctrlblk *rcp)
+{
+       int cpu;
+       cpumask_t cpumask;
+       set_need_resched();
+       if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
+               rdp->last_rs_qlen = rdp->qlen;
+               /*
+                * Don't send IPI to itself. With irqs disabled,
+                * rdp->cpu is the current cpu.
+                */
+               cpumask = rcp->cpumask;
+               cpu_clear(rdp->cpu, cpumask);
+               for_each_cpu_mask(cpu, cpumask)
+                       smp_send_reschedule(cpu);
+       }
+}
+#else
+static inline void force_quiescent_state(struct rcu_data *rdp,
+                       struct rcu_ctrlblk *rcp)
+{
+       set_need_resched();
+}
+#endif
 
 /**
  * call_rcu - Queue an RCU callback for invocation after a grace period.
        rdp = &__get_cpu_var(rcu_data);
        *rdp->nxttail = head;
        rdp->nxttail = &head->next;
-
-       if (unlikely(++rdp->count > 10000))
-               set_need_resched();
-
+       if (unlikely(++rdp->qlen > qhimark)) {
+               rdp->blimit = INT_MAX;
+               force_quiescent_state(rdp, &rcu_ctrlblk);
+       }
        local_irq_restore(flags);
 }
 
-static atomic_t rcu_barrier_cpu_count;
-static struct semaphore rcu_barrier_sema;
-static struct completion rcu_barrier_completion;
-
 /**
  * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
  * @head: structure to be used for queueing the RCU updates.
        rdp = &__get_cpu_var(rcu_bh_data);
        *rdp->nxttail = head;
        rdp->nxttail = &head->next;
-       rdp->count++;
-/*
- *  Should we directly call rcu_do_batch() here ?
- *  if (unlikely(rdp->count > 10000))
- *      rcu_do_batch(rdp);
- */
+
+       if (unlikely(++rdp->qlen > qhimark)) {
+               rdp->blimit = INT_MAX;
+               force_quiescent_state(rdp, &rcu_bh_ctrlblk);
+       }
+
        local_irq_restore(flags);
 }
 
                next = rdp->donelist = list->next;
                list->func(list);
                list = next;
-               rdp->count--;
-               if (++count >= maxbatch)
+               rdp->qlen--;
+               if (++count >= rdp->blimit)
                        break;
        }
+       if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
+               rdp->blimit = blimit;
        if (!rdp->donelist)
                rdp->donetail = &rdp->donelist;
        else
        rdp->quiescbatch = rcp->completed;
        rdp->qs_pending = 0;
        rdp->cpu = cpu;
+       rdp->blimit = blimit;
 }
 
 static void __devinit rcu_online_cpu(int cpu)
        synchronize_rcu();
 }
 
-module_param(maxbatch, int, 0);
+module_param(blimit, int, 0);
+module_param(qhimark, int, 0);
+module_param(qlowmark, int, 0);
+#ifdef CONFIG_SMP
+module_param(rsinterval, int, 0);
+#endif
 EXPORT_SYMBOL_GPL(rcu_batches_completed);
 EXPORT_SYMBOL(call_rcu);  /* WARNING: GPL-only in April 2006. */
 EXPORT_SYMBOL(call_rcu_bh);  /* WARNING: GPL-only in April 2006. */