X-Git-Url: http://pilppa.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmmaster.c;h=427c0af0d219100c4bb5bbf270edf1ec6d66ecdd;hb=a7f90d83ea8dc8b0999ab7c1c0539af9a6ed69d2;hp=940be4c13b1f09ff4703662f007692a6d4b81e89;hpb=3661f00e2097676847deb01add1a0918044bd816;p=linux-2.6-omap-h63xx.git diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 940be4c13b1..427c0af0d21 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -74,6 +74,7 @@ struct dlm_master_list_entry wait_queue_head_t wq; atomic_t woken; struct kref mle_refs; + int inuse; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -130,15 +131,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, #if 0 /* Code here is included but defined out as it aids debugging */ +#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) +void _dlm_print_nodemap(unsigned long *map, const char *mapname) +{ + int i; + printk("%s=[ ", mapname); + for (i=0; imaybe_map, + *vote = mle->vote_map, + *resp = mle->response_map, + *node = mle->node_map; k = &mle->mle_refs; if (mle->type == DLM_MLE_BLOCK) @@ -159,9 +175,18 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) name = mle->u.res->lockname.name; } - mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", - i, type, refs, master, mle->new_master, attached, - namelen, namelen, name); + mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ", + namelen, name, type, refs, master, mle->new_master, attached, + mle->inuse); + dlm_print_nodemap(maybe); + printk(", "); + dlm_print_nodemap(vote); + printk(", "); + dlm_print_nodemap(resp); + printk(", "); + dlm_print_nodemap(node); + printk(", "); + printk("\n"); } static void dlm_dump_mles(struct dlm_ctxt *dlm) @@ -170,7 +195,6 @@ static void dlm_dump_mles(struct dlm_ctxt *dlm) struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); - mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); spin_lock(&dlm->master_lock); list_for_each(iter, &dlm->master_list) { mle = list_entry(iter, struct dlm_master_list_entry, list); @@ -314,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } +static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + mle->inuse++; + kref_get(&mle->mle_refs); +} + +static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + mle->inuse--; + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + +} + /* remove from list and free */ static void __dlm_put_mle(struct dlm_master_list_entry *mle) { @@ -322,9 +371,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - BUG_ON(!atomic_read(&mle->mle_refs.refcount)); - - kref_put(&mle->mle_refs, dlm_mle_release); + if (!atomic_read(&mle->mle_refs.refcount)) { + /* this may or may not crash, but who cares. + * it's a BUG. */ + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + BUG(); + } else + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -367,6 +421,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, memset(mle->response_map, 0, sizeof(mle->response_map)); mle->master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES; + mle->inuse = 0; if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); @@ -564,6 +619,28 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); + if (!hlist_unhashed(&res->hash_node) || + !list_empty(&res->granted) || + !list_empty(&res->converting) || + !list_empty(&res->blocked) || + !list_empty(&res->dirty) || + !list_empty(&res->recovering) || + !list_empty(&res->purge)) { + mlog(ML_ERROR, + "Going to BUG for resource %.*s." + " We're on a list! [%c%c%c%c%c%c%c]\n", + res->lockname.len, res->lockname.name, + !hlist_unhashed(&res->hash_node) ? 'H' : ' ', + !list_empty(&res->granted) ? 'G' : ' ', + !list_empty(&res->converting) ? 'C' : ' ', + !list_empty(&res->blocked) ? 'B' : ' ', + !list_empty(&res->dirty) ? 'D' : ' ', + !list_empty(&res->recovering) ? 'R' : ' ', + !list_empty(&res->purge) ? 'P' : ' '); + + dlm_print_one_lock_resource(res); + } + /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); @@ -579,11 +656,6 @@ static void dlm_lockres_release(struct kref *kref) kfree(res); } -void dlm_lockres_get(struct dlm_lock_resource *res) -{ - kref_get(&res->refs); -} - void dlm_lockres_put(struct dlm_lock_resource *res) { kref_put(&res->refs, dlm_lockres_release); @@ -603,7 +675,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, memcpy(qname, name, namelen); res->lockname.len = namelen; - res->lockname.hash = full_name_hash(name, namelen); + res->lockname.hash = dlm_lockid_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); @@ -677,19 +749,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; - unsigned int namelen; + unsigned int namelen, hash; int tries = 0; int bit, wait_on_recovery = 0; BUG_ON(!lockid); namelen = strlen(lockid); + hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen); lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); + tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); @@ -790,7 +863,7 @@ lookup: * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -880,7 +953,7 @@ wait: dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); wake_waiters: spin_lock(&res->spinlock); @@ -962,6 +1035,12 @@ recheck: "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; + } else { + if (!voting_done) { + mlog(0, "map not changed and voting not done " + "for %s:%.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + } } if (m != O2NM_MAX_NODES) { @@ -1316,7 +1395,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; int found, ret; int set_maybe; int dispatch_assert = 0; @@ -1331,6 +1410,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) name = request->name; namelen = request->namelen; + hash = dlm_lockid_hash(name, namelen); if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; @@ -1339,7 +1419,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) way_up_top: spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_unlock(&dlm->spinlock); @@ -1465,15 +1545,12 @@ way_up_top: mlog_errno(-ENOMEM); goto send_response; } - spin_lock(&dlm->spinlock); - dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, - name, namelen); - spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); + dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; @@ -1612,7 +1689,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; - unsigned int namelen; + unsigned int namelen, hash; u32 flags; int master_request = 0; int ret = 0; @@ -1622,6 +1699,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) name = assert->name; namelen = assert->namelen; + hash = dlm_lockid_hash(name, namelen); flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { @@ -1646,7 +1724,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ - mlog(ML_ERROR, "no bits set in the maybe_map, but %u " + mlog(0, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { @@ -1658,19 +1736,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ - mlog(ML_ERROR, "%u is the lowest node, " + mlog(0, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } + if (mle->type == DLM_MLE_MIGRATION) { + if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { + mlog(0, "%s:%.*s: got cleanup assert" + " from %u for migration\n", + dlm->name, namelen, name, + assert->node_idx); + } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) { + mlog(0, "%s:%.*s: got unrelated assert" + " from %u for migration, ignoring\n", + dlm->name, namelen, name, + assert->node_idx); + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + goto done; + } + } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -1679,7 +1774,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) goto kill; } if (!mle) { - if (res->owner != assert->node_idx) { + if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && + res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", @@ -1732,6 +1828,7 @@ ok: if (mle) { int extra_ref = 0; int nn = -1; + int rr, err = 0; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) @@ -1751,27 +1848,64 @@ ok: wake_up(&mle->wq); spin_unlock(&mle->spinlock); - if (mle->type == DLM_MLE_MIGRATION && res) { - mlog(0, "finishing off migration of lockres %.*s, " - "from %u to %u\n", - res->lockname.len, res->lockname.name, - dlm->node_num, mle->new_master); + if (res) { spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - dlm_change_lockres_owner(dlm, res, mle->new_master); - BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + if (mle->type == DLM_MLE_MIGRATION) { + mlog(0, "finishing off migration of lockres %.*s, " + "from %u to %u\n", + res->lockname.len, res->lockname.name, + dlm->node_num, mle->new_master); + res->state &= ~DLM_LOCK_RES_MIGRATING; + dlm_change_lockres_owner(dlm, res, mle->new_master); + BUG_ON(res->state & DLM_LOCK_RES_DIRTY); + } else { + dlm_change_lockres_owner(dlm, res, mle->master); + } spin_unlock(&res->spinlock); } - /* master is known, detach if not already detached */ - dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); - + + /* master is known, detach if not already detached. + * ensures that only one assert_master call will happen + * on this mle. */ + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + + rr = atomic_read(&mle->mle_refs.refcount); + if (mle->inuse > 0) { + if (extra_ref && rr < 3) + err = 1; + else if (!extra_ref && rr < 2) + err = 1; + } else { + if (extra_ref && rr < 2) + err = 1; + else if (!extra_ref && rr < 1) + err = 1; + } + if (err) { + mlog(ML_ERROR, "%s:%.*s: got assert master from %u " + "that will mess up this node, refs=%d, extra=%d, " + "inuse=%d\n", dlm->name, namelen, name, + assert->node_idx, rr, extra_ref, mle->inuse); + dlm_print_one_mle(mle); + } + list_del_init(&mle->list); + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ - dlm_put_mle(mle); + __dlm_put_mle(mle); + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + } else if (res) { + if (res->owner != assert->node_idx) { + mlog(0, "assert_master from %u, but current " + "owner is %u (%.*s), no mle\n", assert->node_idx, + res->owner, namelen, name); } } @@ -2117,7 +2251,7 @@ fail: * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -2134,7 +2268,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } @@ -2164,8 +2301,8 @@ fail: /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { - mlog(0, "%s:%.*s: expected migration target %u " - "is no longer up. restarting.\n", + mlog(0, "%s:%.*s: expected migration " + "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; @@ -2175,7 +2312,10 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); goto leave; } /* TODO: if node died: stop, clean up, return error */ @@ -2191,7 +2331,7 @@ fail: /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); + dlm_put_mle_inuse(mle); ret = 0; dlm_lockres_calc_usage(dlm, res); @@ -2462,7 +2602,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; const char *name; - unsigned int namelen; + unsigned int namelen, hash; int ret = 0; if (!dlm_grab(dlm)) @@ -2470,6 +2610,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) name = migrate->name; namelen = migrate->namelen; + hash = dlm_lockid_hash(name, namelen); /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, @@ -2482,7 +2623,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) /* check for pre-existing lock */ spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen); + res = __dlm_lookup_lockres(dlm, name, namelen, hash); spin_lock(&dlm->master_lock); if (res) { @@ -2580,6 +2721,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); + __dlm_mle_detach_hb_events(dlm, mle); } spin_unlock(&tmp->spinlock); } @@ -2601,6 +2743,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) struct list_head *iter, *iter2; struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; + unsigned int hash; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -2673,19 +2816,21 @@ top: /* remove from the list early. NOTE: unlinking * list_head while in list_for_each_safe */ + __dlm_mle_detach_hb_events(dlm, mle); spin_lock(&mle->spinlock); list_del_init(&mle->list); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "node %u died during migration from " - "%u to %u!\n", dead_node, + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ + hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len); + mle->u.name.len, hash); if (res) { /* unfortunately if we hit this rare case, our * lock ordering is messed. we need to drop