]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - fs/ocfs2/dlm/dlmmaster.c
ocfs2: dump lockres info before we BUG() on a bad reference
[linux-2.6-omap-h63xx.git] / fs / ocfs2 / dlm / dlmmaster.c
index 940be4c13b1f09ff4703662f007692a6d4b81e89..427c0af0d219100c4bb5bbf270edf1ec6d66ecdd 100644 (file)
@@ -74,6 +74,7 @@ struct dlm_master_list_entry
        wait_queue_head_t wq;
        atomic_t woken;
        struct kref mle_refs;
+       int inuse;
        unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
        unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
        unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -130,15 +131,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
 #if 0
 /* Code here is included but defined out as it aids debugging */
 
+#define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
+void _dlm_print_nodemap(unsigned long *map, const char *mapname)
+{
+       int i;
+       printk("%s=[ ", mapname);
+       for (i=0; i<O2NM_MAX_NODES; i++)
+               if (test_bit(i, map))
+                       printk("%d ", i);
+       printk("]");
+}
+
 void dlm_print_one_mle(struct dlm_master_list_entry *mle)
 {
-       int i = 0, refs;
+       int refs;
        char *type;
        char attached;
        u8 master;
        unsigned int namelen;
        const char *name;
        struct kref *k;
+       unsigned long *maybe = mle->maybe_map,
+                     *vote = mle->vote_map,
+                     *resp = mle->response_map,
+                     *node = mle->node_map;
 
        k = &mle->mle_refs;
        if (mle->type == DLM_MLE_BLOCK)
@@ -159,9 +175,18 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
                name = mle->u.res->lockname.name;
        }
 
-       mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
-                 i, type, refs, master, mle->new_master, attached,
-                 namelen, namelen, name);
+       mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
+                 namelen, name, type, refs, master, mle->new_master, attached,
+                 mle->inuse);
+       dlm_print_nodemap(maybe);
+       printk(", ");
+       dlm_print_nodemap(vote);
+       printk(", ");
+       dlm_print_nodemap(resp);
+       printk(", ");
+       dlm_print_nodemap(node);
+       printk(", ");
+       printk("\n");
 }
 
 static void dlm_dump_mles(struct dlm_ctxt *dlm)
@@ -170,7 +195,6 @@ static void dlm_dump_mles(struct dlm_ctxt *dlm)
        struct list_head *iter;
        
        mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
-       mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
        spin_lock(&dlm->master_lock);
        list_for_each(iter, &dlm->master_list) {
                mle = list_entry(iter, struct dlm_master_list_entry, list);
@@ -314,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
        spin_unlock(&dlm->spinlock);
 }
 
+static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
+{
+       struct dlm_ctxt *dlm;
+       dlm = mle->dlm;
+
+       assert_spin_locked(&dlm->spinlock);
+       assert_spin_locked(&dlm->master_lock);
+       mle->inuse++;
+       kref_get(&mle->mle_refs);
+}
+
+static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
+{
+       struct dlm_ctxt *dlm;
+       dlm = mle->dlm;
+
+       spin_lock(&dlm->spinlock);
+       spin_lock(&dlm->master_lock);
+       mle->inuse--;
+       __dlm_put_mle(mle);
+       spin_unlock(&dlm->master_lock);
+       spin_unlock(&dlm->spinlock);
+
+}
+
 /* remove from list and free */
 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 {
@@ -322,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 
        assert_spin_locked(&dlm->spinlock);
        assert_spin_locked(&dlm->master_lock);
-       BUG_ON(!atomic_read(&mle->mle_refs.refcount));
-
-       kref_put(&mle->mle_refs, dlm_mle_release);
+       if (!atomic_read(&mle->mle_refs.refcount)) {
+               /* this may or may not crash, but who cares.
+                * it's a BUG. */
+               mlog(ML_ERROR, "bad mle: %p\n", mle);
+               dlm_print_one_mle(mle);
+               BUG();
+       } else
+               kref_put(&mle->mle_refs, dlm_mle_release);
 }
 
 
@@ -367,6 +421,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
        memset(mle->response_map, 0, sizeof(mle->response_map));
        mle->master = O2NM_MAX_NODES;
        mle->new_master = O2NM_MAX_NODES;
+       mle->inuse = 0;
 
        if (mle->type == DLM_MLE_MASTER) {
                BUG_ON(!res);
@@ -564,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref)
        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
             res->lockname.name);
 
+       if (!hlist_unhashed(&res->hash_node) ||
+           !list_empty(&res->granted) ||
+           !list_empty(&res->converting) ||
+           !list_empty(&res->blocked) ||
+           !list_empty(&res->dirty) ||
+           !list_empty(&res->recovering) ||
+           !list_empty(&res->purge)) {
+               mlog(ML_ERROR,
+                    "Going to BUG for resource %.*s."
+                    "  We're on a list! [%c%c%c%c%c%c%c]\n",
+                    res->lockname.len, res->lockname.name,
+                    !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
+                    !list_empty(&res->granted) ? 'G' : ' ',
+                    !list_empty(&res->converting) ? 'C' : ' ',
+                    !list_empty(&res->blocked) ? 'B' : ' ',
+                    !list_empty(&res->dirty) ? 'D' : ' ',
+                    !list_empty(&res->recovering) ? 'R' : ' ',
+                    !list_empty(&res->purge) ? 'P' : ' ');
+
+               dlm_print_one_lock_resource(res);
+       }
+
        /* By the time we're ready to blow this guy away, we shouldn't
         * be on any lists. */
        BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -579,11 +656,6 @@ static void dlm_lockres_release(struct kref *kref)
        kfree(res);
 }
 
-void dlm_lockres_get(struct dlm_lock_resource *res)
-{
-       kref_get(&res->refs);
-}
-
 void dlm_lockres_put(struct dlm_lock_resource *res)
 {
        kref_put(&res->refs, dlm_lockres_release);
@@ -603,7 +675,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
        memcpy(qname, name, namelen);
 
        res->lockname.len = namelen;
-       res->lockname.hash = full_name_hash(name, namelen);
+       res->lockname.hash = dlm_lockid_hash(name, namelen);
 
        init_waitqueue_head(&res->wq);
        spin_lock_init(&res->spinlock);
@@ -677,19 +749,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
        int blocked = 0;
        int ret, nodenum;
        struct dlm_node_iter iter;
-       unsigned int namelen;
+       unsigned int namelen, hash;
        int tries = 0;
        int bit, wait_on_recovery = 0;
 
        BUG_ON(!lockid);
 
        namelen = strlen(lockid);
+       hash = dlm_lockid_hash(lockid, namelen);
 
        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 
 lookup:
        spin_lock(&dlm->spinlock);
-       tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
+       tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
        if (tmpres) {
                spin_unlock(&dlm->spinlock);
                mlog(0, "found in hash!\n");
@@ -790,7 +863,7 @@ lookup:
         * if so, the creator of the BLOCK may try to put the last
         * ref at this time in the assert master handler, so we
         * need an extra one to keep from a bad ptr deref. */
-       dlm_get_mle(mle);
+       dlm_get_mle_inuse(mle);
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
@@ -880,7 +953,7 @@ wait:
        dlm_mle_detach_hb_events(dlm, mle);
        dlm_put_mle(mle);
        /* put the extra ref */
-       dlm_put_mle(mle);
+       dlm_put_mle_inuse(mle);
 
 wake_waiters:
        spin_lock(&res->spinlock);
@@ -962,6 +1035,12 @@ recheck:
                     "rechecking now\n", dlm->name, res->lockname.len,
                     res->lockname.name);
                goto recheck;
+       } else {
+               if (!voting_done) {
+                       mlog(0, "map not changed and voting not done "
+                            "for %s:%.*s\n", dlm->name, res->lockname.len,
+                            res->lockname.name);
+               }
        }
 
        if (m != O2NM_MAX_NODES) {
@@ -1316,7 +1395,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
        struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
        struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
        char *name;
-       unsigned int namelen;
+       unsigned int namelen, hash;
        int found, ret;
        int set_maybe;
        int dispatch_assert = 0;
@@ -1331,6 +1410,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
        name = request->name;
        namelen = request->namelen;
+       hash = dlm_lockid_hash(name, namelen);
 
        if (namelen > DLM_LOCKID_NAME_MAX) {
                response = DLM_IVBUFLEN;
@@ -1339,7 +1419,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
 way_up_top:
        spin_lock(&dlm->spinlock);
-       res = __dlm_lookup_lockres(dlm, name, namelen);
+       res = __dlm_lookup_lockres(dlm, name, namelen, hash);
        if (res) {
                spin_unlock(&dlm->spinlock);
 
@@ -1465,15 +1545,12 @@ way_up_top:
                                mlog_errno(-ENOMEM);
                                goto send_response;
                        }
-                       spin_lock(&dlm->spinlock);
-                       dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
-                                        name, namelen);
-                       spin_unlock(&dlm->spinlock);
                        goto way_up_top;
                }
 
                // mlog(0, "this is second time thru, already allocated, "
                // "add the block.\n");
+               dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
                set_bit(request->node_idx, mle->maybe_map);
                list_add(&mle->list, &dlm->master_list);
                response = DLM_MASTER_RESP_NO;
@@ -1612,7 +1689,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
        struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
        struct dlm_lock_resource *res = NULL;
        char *name;
-       unsigned int namelen;
+       unsigned int namelen, hash;
        u32 flags;
        int master_request = 0;
        int ret = 0;
@@ -1622,6 +1699,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 
        name = assert->name;
        namelen = assert->namelen;
+       hash = dlm_lockid_hash(name, namelen);
        flags = be32_to_cpu(assert->flags);
 
        if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -1646,7 +1724,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
                if (bit >= O2NM_MAX_NODES) {
                        /* not necessarily an error, though less likely.
                         * could be master just re-asserting. */
-                       mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
+                       mlog(0, "no bits set in the maybe_map, but %u "
                             "is asserting! (%.*s)\n", assert->node_idx,
                             namelen, name);
                } else if (bit != assert->node_idx) {
@@ -1658,19 +1736,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
                                 * number winning the mastery will respond
                                 * YES to mastery requests, but this node
                                 * had no way of knowing.  let it pass. */
-                               mlog(ML_ERROR, "%u is the lowest node, "
+                               mlog(0, "%u is the lowest node, "
                                     "%u is asserting. (%.*s)  %u must "
                                     "have begun after %u won.\n", bit,
                                     assert->node_idx, namelen, name, bit,
                                     assert->node_idx);
                        }
                }
+               if (mle->type == DLM_MLE_MIGRATION) {
+                       if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
+                               mlog(0, "%s:%.*s: got cleanup assert"
+                                    " from %u for migration\n",
+                                    dlm->name, namelen, name,
+                                    assert->node_idx);
+                       } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
+                               mlog(0, "%s:%.*s: got unrelated assert"
+                                    " from %u for migration, ignoring\n",
+                                    dlm->name, namelen, name,
+                                    assert->node_idx);
+                               __dlm_put_mle(mle);
+                               spin_unlock(&dlm->master_lock);
+                               spin_unlock(&dlm->spinlock);
+                               goto done;
+                       }       
+               }
        }
        spin_unlock(&dlm->master_lock);
 
        /* ok everything checks out with the MLE
         * now check to see if there is a lockres */
-       res = __dlm_lookup_lockres(dlm, name, namelen);
+       res = __dlm_lookup_lockres(dlm, name, namelen, hash);
        if (res) {
                spin_lock(&res->spinlock);
                if (res->state & DLM_LOCK_RES_RECOVERING)  {
@@ -1679,7 +1774,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
                        goto kill;
                }
                if (!mle) {
-                       if (res->owner != assert->node_idx) {
+                       if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
+                           res->owner != assert->node_idx) {
                                mlog(ML_ERROR, "assert_master from "
                                          "%u, but current owner is "
                                          "%u! (%.*s)\n",
@@ -1732,6 +1828,7 @@ ok:
        if (mle) {
                int extra_ref = 0;
                int nn = -1;
+               int rr, err = 0;
                
                spin_lock(&mle->spinlock);
                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1751,27 +1848,64 @@ ok:
                wake_up(&mle->wq);
                spin_unlock(&mle->spinlock);
 
-               if (mle->type == DLM_MLE_MIGRATION && res) {
-                       mlog(0, "finishing off migration of lockres %.*s, "
-                            "from %u to %u\n",
-                              res->lockname.len, res->lockname.name,
-                              dlm->node_num, mle->new_master);
+               if (res) {
                        spin_lock(&res->spinlock);
-                       res->state &= ~DLM_LOCK_RES_MIGRATING;
-                       dlm_change_lockres_owner(dlm, res, mle->new_master);
-                       BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+                       if (mle->type == DLM_MLE_MIGRATION) {
+                               mlog(0, "finishing off migration of lockres %.*s, "
+                                       "from %u to %u\n",
+                                       res->lockname.len, res->lockname.name,
+                                       dlm->node_num, mle->new_master);
+                               res->state &= ~DLM_LOCK_RES_MIGRATING;
+                               dlm_change_lockres_owner(dlm, res, mle->new_master);
+                               BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+                       } else {
+                               dlm_change_lockres_owner(dlm, res, mle->master);
+                       }
                        spin_unlock(&res->spinlock);
                }
-               /* master is known, detach if not already detached */
-               dlm_mle_detach_hb_events(dlm, mle);
-               dlm_put_mle(mle);
-               
+
+               /* master is known, detach if not already detached.
+                * ensures that only one assert_master call will happen
+                * on this mle. */
+               spin_lock(&dlm->spinlock);
+               spin_lock(&dlm->master_lock);
+
+               rr = atomic_read(&mle->mle_refs.refcount);
+               if (mle->inuse > 0) {
+                       if (extra_ref && rr < 3)
+                               err = 1;
+                       else if (!extra_ref && rr < 2)
+                               err = 1;
+               } else {
+                       if (extra_ref && rr < 2)
+                               err = 1;
+                       else if (!extra_ref && rr < 1)
+                               err = 1;
+               }
+               if (err) {
+                       mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
+                            "that will mess up this node, refs=%d, extra=%d, "
+                            "inuse=%d\n", dlm->name, namelen, name,
+                            assert->node_idx, rr, extra_ref, mle->inuse);
+                       dlm_print_one_mle(mle);
+               }
+               list_del_init(&mle->list);
+               __dlm_mle_detach_hb_events(dlm, mle);
+               __dlm_put_mle(mle);
                if (extra_ref) {
                        /* the assert master message now balances the extra
                         * ref given by the master / migration request message.
                         * if this is the last put, it will be removed
                         * from the list. */
-                       dlm_put_mle(mle);
+                       __dlm_put_mle(mle);
+               }
+               spin_unlock(&dlm->master_lock);
+               spin_unlock(&dlm->spinlock);
+       } else if (res) {
+               if (res->owner != assert->node_idx) {
+                       mlog(0, "assert_master from %u, but current "
+                            "owner is %u (%.*s), no mle\n", assert->node_idx,
+                            res->owner, namelen, name);
                }
        }
 
@@ -2117,7 +2251,7 @@ fail:
         * take both dlm->spinlock and dlm->master_lock */
        spin_lock(&dlm->spinlock);
        spin_lock(&dlm->master_lock);
-       dlm_get_mle(mle);
+       dlm_get_mle_inuse(mle);
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
@@ -2134,7 +2268,10 @@ fail:
                /* migration failed, detach and clean up mle */
                dlm_mle_detach_hb_events(dlm, mle);
                dlm_put_mle(mle);
-               dlm_put_mle(mle);
+               dlm_put_mle_inuse(mle);
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_MIGRATING;
+               spin_unlock(&res->spinlock);
                goto leave;
        }
 
@@ -2164,8 +2301,8 @@ fail:
                        /* avoid hang during shutdown when migrating lockres 
                         * to a node which also goes down */
                        if (dlm_is_node_dead(dlm, target)) {
-                               mlog(0, "%s:%.*s: expected migration target %u "
-                                    "is no longer up.  restarting.\n",
+                               mlog(0, "%s:%.*s: expected migration "
+                                    "target %u is no longer up, restarting\n",
                                     dlm->name, res->lockname.len,
                                     res->lockname.name, target);
                                ret = -ERESTARTSYS;
@@ -2175,7 +2312,10 @@ fail:
                        /* migration failed, detach and clean up mle */
                        dlm_mle_detach_hb_events(dlm, mle);
                        dlm_put_mle(mle);
-                       dlm_put_mle(mle);
+                       dlm_put_mle_inuse(mle);
+                       spin_lock(&res->spinlock);
+                       res->state &= ~DLM_LOCK_RES_MIGRATING;
+                       spin_unlock(&res->spinlock);
                        goto leave;
                }
                /* TODO: if node died: stop, clean up, return error */
@@ -2191,7 +2331,7 @@ fail:
 
        /* master is known, detach if not already detached */
        dlm_mle_detach_hb_events(dlm, mle);
-       dlm_put_mle(mle);
+       dlm_put_mle_inuse(mle);
        ret = 0;
 
        dlm_lockres_calc_usage(dlm, res);
@@ -2462,7 +2602,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
        struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
        struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
        const char *name;
-       unsigned int namelen;
+       unsigned int namelen, hash;
        int ret = 0;
 
        if (!dlm_grab(dlm))
@@ -2470,6 +2610,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
        name = migrate->name;
        namelen = migrate->namelen;
+       hash = dlm_lockid_hash(name, namelen);
 
        /* preallocate.. if this fails, abort */
        mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
@@ -2482,7 +2623,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
        /* check for pre-existing lock */
        spin_lock(&dlm->spinlock);
-       res = __dlm_lookup_lockres(dlm, name, namelen);
+       res = __dlm_lookup_lockres(dlm, name, namelen, hash);
        spin_lock(&dlm->master_lock);
 
        if (res) {
@@ -2580,6 +2721,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
                        /* remove it from the list so that only one
                         * mle will be found */
                        list_del_init(&tmp->list);
+                       __dlm_mle_detach_hb_events(dlm, mle);
                }
                spin_unlock(&tmp->spinlock);
        }
@@ -2601,6 +2743,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
        struct list_head *iter, *iter2;
        struct dlm_master_list_entry *mle;
        struct dlm_lock_resource *res;
+       unsigned int hash;
 
        mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
 top:
@@ -2673,19 +2816,21 @@ top:
 
                /* remove from the list early.  NOTE: unlinking
                 * list_head while in list_for_each_safe */
+               __dlm_mle_detach_hb_events(dlm, mle);
                spin_lock(&mle->spinlock);
                list_del_init(&mle->list);
                atomic_set(&mle->woken, 1);
                spin_unlock(&mle->spinlock);
                wake_up(&mle->wq);
 
-               mlog(0, "node %u died during migration from "
-                    "%u to %u!\n", dead_node,
+               mlog(0, "%s: node %u died during migration from "
+                    "%u to %u!\n", dlm->name, dead_node,
                     mle->master, mle->new_master);
                /* if there is a lockres associated with this
                 * mle, find it and set its owner to UNKNOWN */
+               hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
                res = __dlm_lookup_lockres(dlm, mle->u.name.name,
-                                       mle->u.name.len);
+                                          mle->u.name.len, hash);
                if (res) {
                        /* unfortunately if we hit this rare case, our
                         * lock ordering is messed.  we need to drop