]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - fs/ocfs2/dlm/dlmmaster.c
[2.6 patch] ocfs2: make dlm_do_assert_master() static
[linux-2.6-omap-h63xx.git] / fs / ocfs2 / dlm / dlmmaster.c
index bd1268778b660b5b54e28589f33ada63e2445540..c92d1b19fc0bbb51c0c98cbe27847f98ac6fce10 100644 (file)
@@ -102,6 +102,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
                                struct dlm_lock_resource *res,
                                void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
 
 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
                                struct dlm_master_list_entry *mle,
@@ -191,25 +192,20 @@ static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
 static void dlm_dump_mles(struct dlm_ctxt *dlm)
 {
        struct dlm_master_list_entry *mle;
-       struct list_head *iter;
        
        mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
        spin_lock(&dlm->master_lock);
-       list_for_each(iter, &dlm->master_list) {
-               mle = list_entry(iter, struct dlm_master_list_entry, list);
+       list_for_each_entry(mle, &dlm->master_list, list)
                dlm_print_one_mle(mle);
-       }
        spin_unlock(&dlm->master_lock);
 }
 
 int dlm_dump_all_mles(const char __user *data, unsigned int len)
 {
-       struct list_head *iter;
        struct dlm_ctxt *dlm;
 
        spin_lock(&dlm_domain_lock);
-       list_for_each(iter, &dlm_domains) {
-               dlm = list_entry (iter, struct dlm_ctxt, list);
+       list_for_each_entry(dlm, &dlm_domains, list) {
                mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
                dlm_dump_mles(dlm);
        }
@@ -453,12 +449,10 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
                        char *name, unsigned int namelen)
 {
        struct dlm_master_list_entry *tmpmle;
-       struct list_head *iter;
 
        assert_spin_locked(&dlm->master_lock);
 
-       list_for_each(iter, &dlm->master_list) {
-               tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
+       list_for_each_entry(tmpmle, &dlm->master_list, list) {
                if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
                        continue;
                dlm_get_mle(tmpmle);
@@ -471,13 +465,10 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
 {
        struct dlm_master_list_entry *mle;
-       struct list_head *iter;
 
        assert_spin_locked(&dlm->spinlock);
        
-       list_for_each(iter, &dlm->mle_hb_events) {
-               mle = list_entry(iter, struct dlm_master_list_entry, 
-                                hb_events);
+       list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
                if (node_up)
                        dlm_mle_node_up(dlm, mle, NULL, idx);
                else
@@ -519,7 +510,7 @@ int dlm_init_mle_cache(void)
        dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
                                          sizeof(struct dlm_master_list_entry),
                                          0, SLAB_HWCACHE_ALIGN,
-                                         NULL, NULL);
+                                         NULL);
        if (dlm_mle_cache == NULL)
                return -ENOMEM;
        return 0;
@@ -917,7 +908,7 @@ lookup:
                 * but they might own this lockres.  wait on them. */
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
                if (bit < O2NM_MAX_NODES) {
-                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
                             "recover before lock mastery can begin\n",
                             dlm->name, namelen, (char *)lockid, bit);
                        wait_on_recovery = 1;
@@ -971,7 +962,7 @@ redo_request:
                spin_lock(&dlm->spinlock);
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
                if (bit < O2NM_MAX_NODES) {
-                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
                             "recover before lock mastery can begin\n",
                             dlm->name, namelen, (char *)lockid, bit);
                        wait_on_recovery = 1;
@@ -1704,9 +1695,9 @@ send_response:
  * can periodically run all locks owned by this node
  * and re-assert across the cluster...
  */
-int dlm_do_assert_master(struct dlm_ctxt *dlm,
-                        struct dlm_lock_resource *res,
-                        void *nodemap, u32 flags)
+static int dlm_do_assert_master(struct dlm_ctxt *dlm,
+                               struct dlm_lock_resource *res,
+                               void *nodemap, u32 flags)
 {
        struct dlm_assert_master assert;
        int to, tmpret;
@@ -1717,6 +1708,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm,
        unsigned int namelen = res->lockname.len;
 
        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+       spin_lock(&res->spinlock);
+       res->state |= DLM_LOCK_RES_SETREF_INPROG;
+       spin_unlock(&res->spinlock);
+
 again:
        reassert = 0;
 
@@ -1789,6 +1785,11 @@ again:
        if (reassert)
                goto again;
 
+       spin_lock(&res->spinlock);
+       res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+       spin_unlock(&res->spinlock);
+       wake_up(&res->wq);
+
        return ret;
 }
 
@@ -2036,8 +2037,12 @@ ok:
 
 done:
        ret = 0;
-       if (res)
-               dlm_lockres_put(res);
+       if (res) {
+               spin_lock(&res->spinlock);
+               res->state |= DLM_LOCK_RES_SETREF_INPROG;
+               spin_unlock(&res->spinlock);
+               *ret_data = (void *)res;
+       }
        dlm_put(dlm);
        if (master_request) {
                mlog(0, "need to tell master to reassert\n");
@@ -2064,11 +2069,25 @@ kill:
        __dlm_print_one_lock_resource(res);
        spin_unlock(&res->spinlock);
        spin_unlock(&dlm->spinlock);
-       dlm_lockres_put(res);
+       *ret_data = (void *)res; 
        dlm_put(dlm);
        return -EINVAL;
 }
 
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
+{
+       struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
+
+       if (ret_data) {
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+               spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
+               dlm_lockres_put(res);
+       }
+       return;
+}
+
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
                               struct dlm_lock_resource *res,
                               int ignore_higher, u8 request_from, u32 flags)
@@ -2278,6 +2297,9 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
        int ret = -EINVAL;
        u8 node;
        unsigned int hash;
+       struct dlm_work_item *item;
+       int cleared = 0;
+       int dispatch = 0;
 
        if (!dlm_grab(dlm))
                return 0;
@@ -2308,27 +2330,140 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
        spin_unlock(&dlm->spinlock);
 
        spin_lock(&res->spinlock);
-       BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
-       if (test_bit(node, res->refmap)) {
-               ret = 0;
-               dlm_lockres_clear_refmap_bit(node, res);
-       } else {
-               mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
-                    "but it is already dropped!\n", dlm->name, namelen,
-                    name, node);
-               __dlm_print_one_lock_resource(res);
+       if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+               dispatch = 1;
+       else {
+               BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+               if (test_bit(node, res->refmap)) {
+                       dlm_lockres_clear_refmap_bit(node, res);
+                       cleared = 1;
+               }
        }
        spin_unlock(&res->spinlock);
 
-       if (!ret)
-               dlm_lockres_calc_usage(dlm, res);
+       if (!dispatch) {
+               if (cleared)
+                       dlm_lockres_calc_usage(dlm, res);
+               else {
+                       mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+                       "but it is already dropped!\n", dlm->name,
+                       res->lockname.len, res->lockname.name, node);
+                       __dlm_print_one_lock_resource(res);
+               }
+               ret = 0;
+               goto done;
+       }
+
+       item = kzalloc(sizeof(*item), GFP_NOFS);
+       if (!item) {
+               ret = -ENOMEM;
+               mlog_errno(ret);
+               goto done;
+       }
+
+       dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+       item->u.dl.deref_res = res;
+       item->u.dl.deref_node = node;
+
+       spin_lock(&dlm->work_lock);
+       list_add_tail(&item->list, &dlm->work_list);
+       spin_unlock(&dlm->work_lock);
+
+       queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+       return 0;
+
 done:
        if (res)
                dlm_lockres_put(res);
        dlm_put(dlm);
+
        return ret;
 }
 
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+       struct dlm_ctxt *dlm;
+       struct dlm_lock_resource *res;
+       u8 node;
+       u8 cleared = 0;
+
+       dlm = item->dlm;
+       res = item->u.dl.deref_res;
+       node = item->u.dl.deref_node;
+
+       spin_lock(&res->spinlock);
+       BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+       if (test_bit(node, res->refmap)) {
+               __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+               dlm_lockres_clear_refmap_bit(node, res);
+               cleared = 1;
+       }
+       spin_unlock(&res->spinlock);
+
+       if (cleared) {
+               mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+                    dlm->name, res->lockname.len, res->lockname.name, node);
+               dlm_lockres_calc_usage(dlm, res);
+       } else {
+               mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+                    "but it is already dropped!\n", dlm->name,
+                    res->lockname.len, res->lockname.name, node);
+               __dlm_print_one_lock_resource(res);
+       }
+
+       dlm_lockres_put(res);
+}
+
+/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
+ * if not. If 0, numlocks is set to the number of locks in the lockres.
+ */
+static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     int *numlocks)
+{
+       int ret;
+       int i;
+       int count = 0;
+       struct list_head *queue;
+       struct dlm_lock *lock;
+
+       assert_spin_locked(&res->spinlock);
+
+       ret = -EINVAL;
+       if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+               mlog(0, "cannot migrate lockres with unknown owner!\n");
+               goto leave;
+       }
+
+       if (res->owner != dlm->node_num) {
+               mlog(0, "cannot migrate lockres this node doesn't own!\n");
+               goto leave;
+       }
+
+       ret = 0;
+       queue = &res->granted;
+       for (i = 0; i < 3; i++) {
+               list_for_each_entry(lock, queue, list) {
+                       ++count;
+                       if (lock->ml.node == dlm->node_num) {
+                               mlog(0, "found a lock owned by this node still "
+                                    "on the %s queue!  will not migrate this "
+                                    "lockres\n", (i == 0 ? "granted" :
+                                                  (i == 1 ? "converting" :
+                                                   "blocked")));
+                               ret = -ENOTEMPTY;
+                               goto leave;
+                       }
+               }
+               queue++;
+       }
+
+       *numlocks = count;
+       mlog(0, "migrateable lockres having %d locks\n", *numlocks);
+
+leave:
+       return ret;
+}
 
 /*
  * DLM_MIGRATE_LOCKRES
@@ -2342,14 +2477,12 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
        struct dlm_master_list_entry *mle = NULL;
        struct dlm_master_list_entry *oldmle = NULL;
        struct dlm_migratable_lockres *mres = NULL;
-       int ret = -EINVAL;
+       int ret = 0;
        const char *name;
        unsigned int namelen;
        int mle_added = 0;
-       struct list_head *queue, *iter;
-       int i;
-       struct dlm_lock *lock;
-       int empty = 1, wake = 0;
+       int numlocks;
+       int wake = 0;
 
        if (!dlm_grab(dlm))
                return -EINVAL;
@@ -2363,42 +2496,16 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
         * ensure this lockres is a proper candidate for migration
         */
        spin_lock(&res->spinlock);
-       if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
-               mlog(0, "cannot migrate lockres with unknown owner!\n");
-               spin_unlock(&res->spinlock);
-               goto leave;
-       }
-       if (res->owner != dlm->node_num) {
-               mlog(0, "cannot migrate lockres this node doesn't own!\n");
+       ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+       if (ret < 0) {
                spin_unlock(&res->spinlock);
                goto leave;
        }
-       mlog(0, "checking queues...\n");
-       queue = &res->granted;
-       for (i=0; i<3; i++) {
-               list_for_each(iter, queue) {
-                       lock = list_entry (iter, struct dlm_lock, list);
-                       empty = 0;
-                       if (lock->ml.node == dlm->node_num) {
-                               mlog(0, "found a lock owned by this node "
-                                    "still on the %s queue!  will not "
-                                    "migrate this lockres\n",
-                                    i==0 ? "granted" :
-                                    (i==1 ? "converting" : "blocked"));
-                               spin_unlock(&res->spinlock);
-                               ret = -ENOTEMPTY;
-                               goto leave;
-                       }
-               }
-               queue++;
-       }
-       mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
        spin_unlock(&res->spinlock);
 
        /* no work to do */
-       if (empty) {
+       if (numlocks == 0) {
                mlog(0, "no locks were found on this lockres! done!\n");
-               ret = 0;
                goto leave;
        }
 
@@ -2634,16 +2741,27 @@ int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 {
        int ret;
        int lock_dropped = 0;
+       int numlocks;
 
+       spin_lock(&res->spinlock);
        if (res->owner != dlm->node_num) {
                if (!__dlm_lockres_unused(res)) {
                        mlog(ML_ERROR, "%s:%.*s: this node is not master, "
                             "trying to free this but locks remain\n",
                             dlm->name, res->lockname.len, res->lockname.name);
                }
+               spin_unlock(&res->spinlock);
                goto leave;
        }
 
+       /* No need to migrate a lockres having no locks */
+       ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+       if (ret >= 0 && numlocks == 0) {
+               spin_unlock(&res->spinlock);
+               goto leave;
+       }
+       spin_unlock(&res->spinlock);
+
        /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
        spin_unlock(&dlm->spinlock);
        lock_dropped = 1;
@@ -2794,18 +2912,16 @@ again:
 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
                                      struct dlm_lock_resource *res)
 {
-       struct list_head *iter, *iter2;
        struct list_head *queue = &res->granted;
        int i, bit;
-       struct dlm_lock *lock;
+       struct dlm_lock *lock, *next;
 
        assert_spin_locked(&res->spinlock);
 
        BUG_ON(res->owner == dlm->node_num);
 
        for (i=0; i<3; i++) {
-               list_for_each_safe(iter, iter2, queue) {
-                       lock = list_entry (iter, struct dlm_lock, list);
+               list_for_each_entry_safe(lock, next, queue, list) {
                        if (lock->ml.node != dlm->node_num) {
                                mlog(0, "putting lock for node %u\n",
                                     lock->ml.node);
@@ -2847,7 +2963,6 @@ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 {
        int i;
        struct list_head *queue = &res->granted;
-       struct list_head *iter;
        struct dlm_lock *lock;
        int nodenum;
 
@@ -2855,10 +2970,9 @@ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 
        spin_lock(&res->spinlock);
        for (i=0; i<3; i++) {
-               list_for_each(iter, queue) {
+               list_for_each_entry(lock, queue, list) {
                        /* up to the caller to make sure this node
                         * is alive */
-                       lock = list_entry (iter, struct dlm_lock, list);
                        if (lock->ml.node != dlm->node_num) {
                                spin_unlock(&res->spinlock);
                                return lock->ml.node;
@@ -3105,8 +3219,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 
 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
 {
-       struct list_head *iter, *iter2;
-       struct dlm_master_list_entry *mle;
+       struct dlm_master_list_entry *mle, *next;
        struct dlm_lock_resource *res;
        unsigned int hash;
 
@@ -3116,9 +3229,7 @@ top:
 
        /* clean the master list */
        spin_lock(&dlm->master_lock);
-       list_for_each_safe(iter, iter2, &dlm->master_list) {
-               mle = list_entry(iter, struct dlm_master_list_entry, list);
-
+       list_for_each_entry_safe(mle, next, &dlm->master_list, list) {
                BUG_ON(mle->type != DLM_MLE_BLOCK &&
                       mle->type != DLM_MLE_MASTER &&
                       mle->type != DLM_MLE_MIGRATION);