]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - fs/ocfs2/dlm/dlmrecovery.c
Pull bugzilla-7880 into release branch
[linux-2.6-omap-h63xx.git] / fs / ocfs2 / dlm / dlmrecovery.c
index 367a11e9e2ed3b30f10ca1da7cc3c2d090184dae..a2c33160bfd6838dd17487b49492f6a85e5b3539 100644 (file)
@@ -158,25 +158,20 @@ void dlm_dispatch_work(struct work_struct *work)
        struct dlm_ctxt *dlm =
                container_of(work, struct dlm_ctxt, dispatched_work);
        LIST_HEAD(tmp_list);
-       struct list_head *iter, *iter2;
-       struct dlm_work_item *item;
+       struct dlm_work_item *item, *next;
        dlm_workfunc_t *workfunc;
        int tot=0;
 
-       if (!dlm_joined(dlm))
-               return;
-
        spin_lock(&dlm->work_lock);
        list_splice_init(&dlm->work_list, &tmp_list);
        spin_unlock(&dlm->work_lock);
 
-       list_for_each_safe(iter, iter2, &tmp_list) {
+       list_for_each_entry(item, &tmp_list, list) {
                tot++;
        }
        mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
 
-       list_for_each_safe(iter, iter2, &tmp_list) {
-               item = list_entry(iter, struct dlm_work_item, list);
+       list_for_each_entry_safe(item, next, &tmp_list, list) {
                workfunc = item->func;
                list_del_init(&item->list);
 
@@ -552,7 +547,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
 {
        int status = 0;
        struct dlm_reco_node_data *ndata;
-       struct list_head *iter;
        int all_nodes_done;
        int destroy = 0;
        int pass = 0;
@@ -570,8 +564,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
 
        /* safe to access the node data list without a lock, since this
         * process is the only one to change the list */
-       list_for_each(iter, &dlm->reco.node_data) {
-               ndata = list_entry (iter, struct dlm_reco_node_data, list);
+       list_for_each_entry(ndata, &dlm->reco.node_data, list) {
                BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
                ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
 
@@ -614,6 +607,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                        }
                } while (status != 0);
 
+               spin_lock(&dlm_reco_state_lock);
                switch (ndata->state) {
                        case DLM_RECO_NODE_DATA_INIT:
                        case DLM_RECO_NODE_DATA_FINALIZE_SENT:
@@ -644,6 +638,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                                     ndata->node_num, dead_node);
                                break;
                }
+               spin_unlock(&dlm_reco_state_lock);
        }
 
        mlog(0, "done requesting all lock info\n");
@@ -656,9 +651,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                 * done, or if anyone died */
                all_nodes_done = 1;
                spin_lock(&dlm_reco_state_lock);
-               list_for_each(iter, &dlm->reco.node_data) {
-                       ndata = list_entry (iter, struct dlm_reco_node_data, list);
-
+               list_for_each_entry(ndata, &dlm->reco.node_data, list) {
                        mlog(0, "checking recovery state of node %u\n",
                             ndata->node_num);
                        switch (ndata->state) {
@@ -775,16 +768,14 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
 
 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
 {
-       struct list_head *iter, *iter2;
-       struct dlm_reco_node_data *ndata;
+       struct dlm_reco_node_data *ndata, *next;
        LIST_HEAD(tmplist);
 
        spin_lock(&dlm_reco_state_lock);
        list_splice_init(&dlm->reco.node_data, &tmplist);
        spin_unlock(&dlm_reco_state_lock);
 
-       list_for_each_safe(iter, iter2, &tmplist) {
-               ndata = list_entry (iter, struct dlm_reco_node_data, list);
+       list_for_each_entry_safe(ndata, next, &tmplist, list) {
                list_del_init(&ndata->list);
                kfree(ndata);
        }
@@ -821,7 +812,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
 
 }
 
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+                                 void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -876,7 +868,6 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
        struct dlm_lock_resource *res;
        struct dlm_ctxt *dlm;
        LIST_HEAD(resources);
-       struct list_head *iter;
        int ret;
        u8 dead_node, reco_master;
        int skip_all_done = 0;
@@ -920,8 +911,7 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
 
        /* any errors returned will be due to the new_master dying,
         * the dlm_reco_thread should detect this */
-       list_for_each(iter, &resources) {
-               res = list_entry (iter, struct dlm_lock_resource, recovering);
+       list_for_each_entry(res, &resources, recovering) {
                ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
                                        DLM_MRES_RECOVERY);
                if (ret < 0) {
@@ -978,11 +968,11 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
 }
 
 
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+                              void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
-       struct list_head *iter;
        struct dlm_reco_node_data *ndata = NULL;
        int ret = -EINVAL;
 
@@ -999,8 +989,7 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
                        dlm->reco.dead_node, done->node_idx, dlm->node_num);
 
        spin_lock(&dlm_reco_state_lock);
-       list_for_each(iter, &dlm->reco.node_data) {
-               ndata = list_entry (iter, struct dlm_reco_node_data, list);
+       list_for_each_entry(ndata, &dlm->reco.node_data, list) {
                if (ndata->node_num != done->node_idx)
                        continue;
 
@@ -1048,13 +1037,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
                                        struct list_head *list,
                                        u8 dead_node)
 {
-       struct dlm_lock_resource *res;
-       struct list_head *iter, *iter2;
+       struct dlm_lock_resource *res, *next;
        struct dlm_lock *lock;
 
        spin_lock(&dlm->spinlock);
-       list_for_each_safe(iter, iter2, &dlm->reco.resources) {
-               res = list_entry (iter, struct dlm_lock_resource, recovering);
+       list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
                /* always prune any $RECOVERY entries for dead nodes,
                 * otherwise hangs can occur during later recovery */
                if (dlm_is_recovery_lock(res->lockname.name,
@@ -1129,6 +1116,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
        if (total_locks == mres_total_locks)
                mres->flags |= DLM_MRES_ALL_DONE;
 
+       mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+            send_to);
+
        /* send it */
        ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
                                 sz, send_to, &status);
@@ -1163,7 +1155,7 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
                                        u8 flags, u8 master)
 {
        /* mres here is one full page */
-       memset(mres, 0, PAGE_SIZE);
+       clear_page(mres);
        mres->lockname_len = namelen;
        memcpy(mres->lockname, lockname, namelen);
        mres->num_locks = 0;
@@ -1213,12 +1205,40 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
        return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+                              struct dlm_migratable_lockres *mres)
+{
+       struct dlm_lock dummy;
+       memset(&dummy, 0, sizeof(dummy));
+       dummy.ml.cookie = 0;
+       dummy.ml.type = LKM_IVMODE;
+       dummy.ml.convert_type = LKM_IVMODE;
+       dummy.ml.highest_blocked = LKM_IVMODE;
+       dummy.lksb = NULL;
+       dummy.ml.node = dlm->node_num;
+       dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+                                   struct dlm_migratable_lock *ml,
+                                   u8 *nodenum)
+{
+       if (unlikely(ml->cookie == 0 &&
+           ml->type == LKM_IVMODE &&
+           ml->convert_type == LKM_IVMODE &&
+           ml->highest_blocked == LKM_IVMODE &&
+           ml->list == DLM_BLOCKED_LIST)) {
+               *nodenum = ml->node;
+               return 1;
+       }
+       return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                         struct dlm_migratable_lockres *mres,
                         u8 send_to, u8 flags)
 {
-       struct list_head *queue, *iter;
+       struct list_head *queue;
        int total_locks, i;
        u64 mig_cookie = 0;
        struct dlm_lock *lock;
@@ -1244,9 +1264,7 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
        total_locks = 0;
        for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {
                queue = dlm_list_idx_to_ptr(res, i);
-               list_for_each(iter, queue) {
-                       lock = list_entry (iter, struct dlm_lock, list);
-
+               list_for_each_entry(lock, queue, list) {
                        /* add another lock. */
                        total_locks++;
                        if (!dlm_add_lock_to_array(lock, mres, i))
@@ -1260,6 +1278,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                                goto error;
                }
        }
+       if (total_locks == 0) {
+               /* send a dummy lock to indicate a mastery reference only */
+               mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+                    dlm->name, res->lockname.len, res->lockname.name,
+                    send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+                    "migration");
+               dlm_add_dummy_lock(dlm, mres);
+       }
        /* flush any remaining locks */
        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
        if (ret < 0)
@@ -1293,7 +1319,8 @@ error:
  * do we spin?  returning an error only delays the problem really
  */
 
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+                           void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_migratable_lockres *mres =
@@ -1382,17 +1409,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                spin_lock(&res->spinlock);
                res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
                spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
 
                /* add an extra ref for just-allocated lockres 
                 * otherwise the lockres will be purged immediately */
                dlm_lockres_get(res);
-
        }
 
        /* at this point we have allocated everything we need,
         * and we have a hashed lockres with an extra ref and
         * the proper res->state flags. */
        ret = 0;
+       spin_lock(&res->spinlock);
+       /* drop this either when master requery finds a different master
+        * or when a lock is added by the recovery worker */
+       dlm_lockres_grab_inflight_ref(dlm, res);
        if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
                /* migration cannot have an unknown master */
                BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1431,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                          "unknown owner.. will need to requery: "
                          "%.*s\n", mres->lockname_len, mres->lockname);
        } else {
-               spin_lock(&res->spinlock);
+               /* take a reference now to pin the lockres, drop it
+                * when locks are added in the worker */
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
-               spin_unlock(&res->spinlock);
        }
+       spin_unlock(&res->spinlock);
 
        /* queue up work for dlm_mig_lockres_worker */
        dlm_grab(dlm);  /* get an extra ref for the work item */
@@ -1459,6 +1491,9 @@ again:
                                   "this node will take it.\n",
                                   res->lockname.len, res->lockname.name);
                } else {
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_drop_inflight_ref(dlm, res);
+                       spin_unlock(&res->spinlock);
                        mlog(0, "master needs to respond to sender "
                                  "that node %u still owns %.*s\n",
                                  real_master, res->lockname.len,
@@ -1578,7 +1613,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 /* this function cannot error, so unless the sending
  * or receiving of the message failed, the owner can
  * be trusted */
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+                              void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1696,37 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 {
        struct dlm_migratable_lock *ml;
        struct list_head *queue;
+       struct list_head *tmpq = NULL;
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i, bad;
-       struct list_head *iter;
+       int i, j, bad;
        struct dlm_lock *lock = NULL;
+       u8 from = O2NM_MAX_NODES;
+       unsigned int added = 0;
 
        mlog(0, "running %d locks for this lockres\n", mres->num_locks);
        for (i=0; i<mres->num_locks; i++) {
                ml = &(mres->ml[i]);
+
+               if (dlm_is_dummy_lock(dlm, ml, &from)) {
+                       /* placeholder, just need to set the refmap bit */
+                       BUG_ON(mres->num_locks != 1);
+                       mlog(0, "%s:%.*s: dummy lock for %u\n",
+                            dlm->name, mres->lockname_len, mres->lockname,
+                            from);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(from, res);
+                       spin_unlock(&res->spinlock);
+                       added++;
+                       break;
+               }
                BUG_ON(ml->highest_blocked != LKM_IVMODE);
                newlock = NULL;
                lksb = NULL;
 
                queue = dlm_list_num_to_pointer(res, ml->list);
+               tmpq = NULL;
 
                /* if the lock is for the local node it needs to
                 * be moved to the proper location within the queue.
@@ -1684,26 +1736,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
 
                        spin_lock(&res->spinlock);
-                       list_for_each(iter, queue) {
-                               lock = list_entry (iter, struct dlm_lock, list);
-                               if (lock->ml.cookie != ml->cookie)
-                                       lock = NULL;
-                               else
+                       for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+                               tmpq = dlm_list_idx_to_ptr(res, j);
+                               list_for_each_entry(lock, tmpq, list) {
+                                       if (lock->ml.cookie != ml->cookie)
+                                               lock = NULL;
+                                       else
+                                               break;
+                               }
+                               if (lock)
                                        break;
                        }
 
                        /* lock is always created locally first, and
                         * destroyed locally last.  it must be on the list */
                        if (!lock) {
-                               u64 c = ml->cookie;
+                               __be64 c = ml->cookie;
                                mlog(ML_ERROR, "could not find local lock "
                                               "with cookie %u:%llu!\n",
-                                              dlm_get_lock_cookie_node(c),
-                                              dlm_get_lock_cookie_seq(c));
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+                               __dlm_print_one_lock_resource(res);
                                BUG();
                        }
                        BUG_ON(lock->ml.node != ml->node);
 
+                       if (tmpq != queue) {
+                               mlog(0, "lock was on %u instead of %u for %.*s\n",
+                                    j, ml->list, res->lockname.len, res->lockname.name);
+                               spin_unlock(&res->spinlock);
+                               continue;
+                       }
+
                        /* see NOTE above about why we do not update
                         * to match the master here */
 
@@ -1711,6 +1775,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* do not alter lock refcount.  switching lists. */
                        list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
+                       added++;
 
                        mlog(0, "just reordered a local lock!\n");
                        continue;
@@ -1795,18 +1860,18 @@ skip_lvb:
                spin_lock(&res->spinlock);
                list_for_each_entry(lock, queue, list) {
                        if (lock->ml.cookie == ml->cookie) {
-                               u64 c = lock->ml.cookie;
+                               __be64 c = lock->ml.cookie;
                                mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
                                     "exists on this lockres!\n", dlm->name,
                                     res->lockname.len, res->lockname.name,
-                                    dlm_get_lock_cookie_node(c),
-                                    dlm_get_lock_cookie_seq(c));
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)));
 
                                mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
                                     "node=%u, cookie=%u:%llu, queue=%d\n",
                                     ml->type, ml->convert_type, ml->node,
-                                    dlm_get_lock_cookie_node(ml->cookie),
-                                    dlm_get_lock_cookie_seq(ml->cookie),
+                                    dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
                                     ml->list);
 
                                __dlm_print_one_lock_resource(res);
@@ -1817,12 +1882,22 @@ skip_lvb:
                if (!bad) {
                        dlm_lock_get(newlock);
                        list_add_tail(&newlock->list, queue);
+                       mlog(0, "%s:%.*s: added lock for node %u, "
+                            "setting refmap bit\n", dlm->name,
+                            res->lockname.len, res->lockname.name, ml->node);
+                       dlm_lockres_set_refmap_bit(ml->node, res);
+                       added++;
                }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
 
 leave:
+       /* balance the ref taken when the work was queued */
+       spin_lock(&res->spinlock);
+       dlm_lockres_drop_inflight_ref(dlm, res);
+       spin_unlock(&res->spinlock);
+
        if (ret < 0) {
                mlog_errno(ret);
                if (newlock)
@@ -1837,8 +1912,8 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
                                       struct dlm_lock_resource *res)
 {
        int i;
-       struct list_head *queue, *iter, *iter2;
-       struct dlm_lock *lock;
+       struct list_head *queue;
+       struct dlm_lock *lock, *next;
 
        res->state |= DLM_LOCK_RES_RECOVERING;
        if (!list_empty(&res->recovering)) {
@@ -1854,8 +1929,7 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
        /* find any pending locks and put them back on proper list */
        for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {
                queue = dlm_list_idx_to_ptr(res, i);
-               list_for_each_safe(iter, iter2, queue) {
-                       lock = list_entry (iter, struct dlm_lock, list);
+               list_for_each_entry_safe(lock, next, queue, list) {
                        dlm_lock_get(lock);
                        if (lock->convert_pending) {
                                /* move converting lock back to granted */
@@ -1920,24 +1994,23 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                              u8 dead_node, u8 new_master)
 {
        int i;
-       struct list_head *iter, *iter2;
        struct hlist_node *hash_iter;
        struct hlist_head *bucket;
-
-       struct dlm_lock_resource *res;
+       struct dlm_lock_resource *res, *next;
 
        mlog_entry_void();
 
        assert_spin_locked(&dlm->spinlock);
 
-       list_for_each_safe(iter, iter2, &dlm->reco.resources) {
-               res = list_entry (iter, struct dlm_lock_resource, recovering);
+       list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
                if (res->owner == dead_node) {
                        list_del_init(&res->recovering);
                        spin_lock(&res->spinlock);
+                       /* new_master has our reference from
+                        * the lock state sent during recovery */
                        dlm_change_lockres_owner(dlm, res, new_master);
                        res->state &= ~DLM_LOCK_RES_RECOVERING;
-                       if (!__dlm_lockres_unused(res))
+                       if (__dlm_lockres_has_locks(res))
                                __dlm_dirty_lockres(dlm, res);
                        spin_unlock(&res->spinlock);
                        wake_up(&res->wq);
@@ -1977,9 +2050,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                        dlm_lockres_put(res);
                                }
                                spin_lock(&res->spinlock);
+                               /* new_master has our reference from
+                                * the lock state sent during recovery */
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
-                               if (!__dlm_lockres_unused(res))
+                               if (__dlm_lockres_has_locks(res))
                                        __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                                wake_up(&res->wq);
@@ -2002,7 +2077,7 @@ static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
                               struct dlm_lock_resource *res, u8 dead_node)
 {
-       struct list_head *iter, *queue;
+       struct list_head *queue;
        struct dlm_lock *lock;
        int blank_lvb = 0, local = 0;
        int i;
@@ -2024,8 +2099,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
 
        for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
                queue = dlm_list_idx_to_ptr(res, i);
-               list_for_each(iter, queue) {
-                       lock = list_entry (iter, struct dlm_lock, list);
+               list_for_each_entry(lock, queue, list) {
                        if (lock->ml.node == search_node) {
                                if (dlm_lvb_needs_invalidation(lock, local)) {
                                        /* zero the lksb lvb and lockres lvb */
@@ -2046,8 +2120,8 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
 static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                                struct dlm_lock_resource *res, u8 dead_node)
 {
-       struct list_head *iter, *tmpiter;
-       struct dlm_lock *lock;
+       struct dlm_lock *lock, *next;
+       unsigned int freed = 0;
 
        /* this node is the lockres master:
         * 1) remove any stale locks for the dead node
@@ -2057,28 +2131,41 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
        assert_spin_locked(&res->spinlock);
 
        /* TODO: check pending_asts, pending_basts here */
-       list_for_each_safe(iter, tmpiter, &res->granted) {
-               lock = list_entry (iter, struct dlm_lock, list);
+       list_for_each_entry_safe(lock, next, &res->granted, list) {
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
-       list_for_each_safe(iter, tmpiter, &res->converting) {
-               lock = list_entry (iter, struct dlm_lock, list);
+       list_for_each_entry_safe(lock, next, &res->converting, list) {
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
-       list_for_each_safe(iter, tmpiter, &res->blocked) {
-               lock = list_entry (iter, struct dlm_lock, list);
+       list_for_each_entry_safe(lock, next, &res->blocked, list) {
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
 
+       if (freed) {
+               mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+                    "dropping ref from lockres\n", dlm->name,
+                    res->lockname.len, res->lockname.name, freed, dead_node);
+               BUG_ON(!test_bit(dead_node, res->refmap));
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       } else if (test_bit(dead_node, res->refmap)) {
+               mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+                    "no locks and had not purged before dying\n", dlm->name,
+                    res->lockname.len, res->lockname.name, dead_node);
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       }
+
        /* do not kick thread yet */
        __dlm_dirty_lockres(dlm, res);
 }
@@ -2141,9 +2228,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
                        spin_lock(&res->spinlock);
                        /* zero the lvb if necessary */
                        dlm_revalidate_lvb(dlm, res, dead_node);
-                       if (res->owner == dead_node)
+                       if (res->owner == dead_node) {
+                               if (res->state & DLM_LOCK_RES_DROPPING_REF)
+                                       mlog(0, "%s:%.*s: owned by "
+                                            "dead node %u, this node was "
+                                            "dropping its ref when it died. "
+                                            "continue, dropping the flag.\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, dead_node);
+
+                               /* the wake_up for this will happen when the
+                                * RECOVERING flag is dropped later */
+                               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
                                dlm_move_lockres_to_recovery_list(dlm, res);
-                       else if (res->owner == dlm->node_num) {
+                       else if (res->owner == dlm->node_num) {
                                dlm_free_dead_locks(dlm, res, dead_node);
                                __dlm_lockres_calc_usage(dlm, res);
                        }
@@ -2480,7 +2579,8 @@ retry:
        return ret;
 }
 
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+                          void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2708,8 @@ stage2:
        return ret;
 }
 
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+                             void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;