]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - fs/ocfs2/dlm/dlmmaster.c
Merge branches 'topic/slob/cleanups', 'topic/slob/fixes', 'topic/slub/core', 'topic...
[linux-2.6-omap-h63xx.git] / fs / ocfs2 / dlm / dlmmaster.c
index ea6b8957786062ad91ab213155fc44ea078724b1..0a2813947853dfd68372506b9fb43de894c3d577 100644 (file)
 #include "dlmapi.h"
 #include "dlmcommon.h"
 #include "dlmdomain.h"
+#include "dlmdebug.h"
 
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
 #include "cluster/masklog.h"
 
-enum dlm_mle_type {
-       DLM_MLE_BLOCK,
-       DLM_MLE_MASTER,
-       DLM_MLE_MIGRATION
-};
-
-struct dlm_lock_name
-{
-       u8 len;
-       u8 name[DLM_LOCKID_NAME_MAX];
-};
-
-struct dlm_master_list_entry
-{
-       struct list_head list;
-       struct list_head hb_events;
-       struct dlm_ctxt *dlm;
-       spinlock_t spinlock;
-       wait_queue_head_t wq;
-       atomic_t woken;
-       struct kref mle_refs;
-       int inuse;
-       unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
-       unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
-       unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
-       unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
-       u8 master;
-       u8 new_master;
-       enum dlm_mle_type type;
-       struct o2hb_callback_func mle_hb_up;
-       struct o2hb_callback_func mle_hb_down;
-       union {
-               struct dlm_lock_resource *res;
-               struct dlm_lock_name name;
-       } u;
-};
-
 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
                              struct dlm_master_list_entry *mle,
                              struct o2nm_node *node,
@@ -128,98 +92,10 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
        return 1;
 }
 
-#define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
-static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
-{
-       int i;
-       printk("%s=[ ", mapname);
-       for (i=0; i<O2NM_MAX_NODES; i++)
-               if (test_bit(i, map))
-                       printk("%d ", i);
-       printk("]");
-}
-
-static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
-{
-       int refs;
-       char *type;
-       char attached;
-       u8 master;
-       unsigned int namelen;
-       const char *name;
-       struct kref *k;
-       unsigned long *maybe = mle->maybe_map,
-                     *vote = mle->vote_map,
-                     *resp = mle->response_map,
-                     *node = mle->node_map;
-
-       k = &mle->mle_refs;
-       if (mle->type == DLM_MLE_BLOCK)
-               type = "BLK";
-       else if (mle->type == DLM_MLE_MASTER)
-               type = "MAS";
-       else
-               type = "MIG";
-       refs = atomic_read(&k->refcount);
-       master = mle->master;
-       attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
-
-       if (mle->type != DLM_MLE_MASTER) {
-               namelen = mle->u.name.len;
-               name = mle->u.name.name;
-       } else {
-               namelen = mle->u.res->lockname.len;
-               name = mle->u.res->lockname.name;
-       }
-
-       mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
-                 namelen, name, type, refs, master, mle->new_master, attached,
-                 mle->inuse);
-       dlm_print_nodemap(maybe);
-       printk(", ");
-       dlm_print_nodemap(vote);
-       printk(", ");
-       dlm_print_nodemap(resp);
-       printk(", ");
-       dlm_print_nodemap(node);
-       printk(", ");
-       printk("\n");
-}
-
-#if 0
-/* Code here is included but defined out as it aids debugging */
-
-static void dlm_dump_mles(struct dlm_ctxt *dlm)
-{
-       struct dlm_master_list_entry *mle;
-       
-       mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
-       spin_lock(&dlm->master_lock);
-       list_for_each_entry(mle, &dlm->master_list, list)
-               dlm_print_one_mle(mle);
-       spin_unlock(&dlm->master_lock);
-}
-
-int dlm_dump_all_mles(const char __user *data, unsigned int len)
-{
-       struct dlm_ctxt *dlm;
-
-       spin_lock(&dlm_domain_lock);
-       list_for_each_entry(dlm, &dlm_domains, list) {
-               mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
-               dlm_dump_mles(dlm);
-       }
-       spin_unlock(&dlm_domain_lock);
-       return len;
-}
-EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
-
-#endif  /*  0  */
-
-
+static struct kmem_cache *dlm_lockres_cache = NULL;
+static struct kmem_cache *dlm_lockname_cache = NULL;
 static struct kmem_cache *dlm_mle_cache = NULL;
 
-
 static void dlm_mle_release(struct kref *kref);
 static void dlm_init_mle(struct dlm_master_list_entry *mle,
                        enum dlm_mle_type type,
@@ -507,7 +383,7 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 
 int dlm_init_mle_cache(void)
 {
-       dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
+       dlm_mle_cache = kmem_cache_create("o2dlm_mle",
                                          sizeof(struct dlm_master_list_entry),
                                          0, SLAB_HWCACHE_ALIGN,
                                          NULL);
@@ -560,6 +436,35 @@ static void dlm_mle_release(struct kref *kref)
  * LOCK RESOURCE FUNCTIONS
  */
 
+int dlm_init_master_caches(void)
+{
+       dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
+                                             sizeof(struct dlm_lock_resource),
+                                             0, SLAB_HWCACHE_ALIGN, NULL);
+       if (!dlm_lockres_cache)
+               goto bail;
+
+       dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
+                                              DLM_LOCKID_NAME_MAX, 0,
+                                              SLAB_HWCACHE_ALIGN, NULL);
+       if (!dlm_lockname_cache)
+               goto bail;
+
+       return 0;
+bail:
+       dlm_destroy_master_caches();
+       return -ENOMEM;
+}
+
+void dlm_destroy_master_caches(void)
+{
+       if (dlm_lockname_cache)
+               kmem_cache_destroy(dlm_lockname_cache);
+
+       if (dlm_lockres_cache)
+               kmem_cache_destroy(dlm_lockres_cache);
+}
+
 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
                                  struct dlm_lock_resource *res,
                                  u8 owner)
@@ -600,8 +505,10 @@ void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
 static void dlm_lockres_release(struct kref *kref)
 {
        struct dlm_lock_resource *res;
+       struct dlm_ctxt *dlm;
 
        res = container_of(kref, struct dlm_lock_resource, refs);
+       dlm = res->dlm;
 
        /* This should not happen -- all lockres' have a name
         * associated with them at init time. */
@@ -610,6 +517,18 @@ static void dlm_lockres_release(struct kref *kref)
        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
             res->lockname.name);
 
+       spin_lock(&dlm->track_lock);
+       if (!list_empty(&res->tracking))
+               list_del_init(&res->tracking);
+       else {
+               mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
+                    res->lockname.len, res->lockname.name);
+               dlm_print_one_lock_resource(res);
+       }
+       spin_unlock(&dlm->track_lock);
+
+       dlm_put(dlm);
+
        if (!hlist_unhashed(&res->hash_node) ||
            !list_empty(&res->granted) ||
            !list_empty(&res->converting) ||
@@ -642,9 +561,9 @@ static void dlm_lockres_release(struct kref *kref)
        BUG_ON(!list_empty(&res->recovering));
        BUG_ON(!list_empty(&res->purge));
 
-       kfree(res->lockname.name);
+       kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 
-       kfree(res);
+       kmem_cache_free(dlm_lockres_cache, res);
 }
 
 void dlm_lockres_put(struct dlm_lock_resource *res)
@@ -677,10 +596,15 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
        INIT_LIST_HEAD(&res->dirty);
        INIT_LIST_HEAD(&res->recovering);
        INIT_LIST_HEAD(&res->purge);
+       INIT_LIST_HEAD(&res->tracking);
        atomic_set(&res->asts_reserved, 0);
        res->migration_pending = 0;
        res->inflight_locks = 0;
 
+       /* put in dlm_lockres_release */
+       dlm_grab(dlm);
+       res->dlm = dlm;
+
        kref_init(&res->refs);
 
        /* just for consistency */
@@ -692,6 +616,10 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
 
        res->last_used = 0;
 
+       spin_lock(&dlm->spinlock);
+       list_add_tail(&res->tracking, &dlm->tracking_list);
+       spin_unlock(&dlm->spinlock);
+
        memset(res->lvb, 0, DLM_LVB_LEN);
        memset(res->refmap, 0, sizeof(res->refmap));
 }
@@ -700,20 +628,28 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
                                   const char *name,
                                   unsigned int namelen)
 {
-       struct dlm_lock_resource *res;
+       struct dlm_lock_resource *res = NULL;
 
-       res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
+       res = (struct dlm_lock_resource *)
+                               kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
        if (!res)
-               return NULL;
+               goto error;
 
-       res->lockname.name = kmalloc(namelen, GFP_NOFS);
-       if (!res->lockname.name) {
-               kfree(res);
-               return NULL;
-       }
+       res->lockname.name = (char *)
+                               kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
+       if (!res->lockname.name)
+               goto error;
 
        dlm_init_lockres(dlm, res, name, namelen);
        return res;
+
+error:
+       if (res && res->lockname.name)
+               kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
+
+       if (res)
+               kmem_cache_free(dlm_lockres_cache, res);
+       return NULL;
 }
 
 void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
@@ -796,14 +732,21 @@ lookup:
        if (tmpres) {
                int dropping_ref = 0;
 
+               spin_unlock(&dlm->spinlock);
+
                spin_lock(&tmpres->spinlock);
+               /* We wait for the other thread that is mastering the resource */
+               if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+                       __dlm_wait_on_lockres(tmpres);
+                       BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
+               }
+
                if (tmpres->owner == dlm->node_num) {
                        BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
                        dlm_lockres_grab_inflight_ref(dlm, tmpres);
                } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
                        dropping_ref = 1;
                spin_unlock(&tmpres->spinlock);
-               spin_unlock(&dlm->spinlock);
 
                /* wait until done messaging the master, drop our ref to allow
                 * the lockres to be purged, start over. */
@@ -1906,12 +1849,12 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
                if (!mle) {
                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
                            res->owner != assert->node_idx) {
-                               mlog(ML_ERROR, "assert_master from "
-                                         "%u, but current owner is "
-                                         "%u! (%.*s)\n",
-                                      assert->node_idx, res->owner,
-                                      namelen, name);
-                               goto kill;
+                               mlog(ML_ERROR, "DIE! Mastery assert from %u, "
+                                    "but current owner is %u! (%.*s)\n",
+                                    assert->node_idx, res->owner, namelen,
+                                    name);
+                               __dlm_print_one_lock_resource(res);
+                               BUG();
                        }
                } else if (mle->type != DLM_MLE_MIGRATION) {
                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -3023,7 +2966,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
                                  struct dlm_node_iter *iter)
 {
        struct dlm_migrate_request migrate;
-       int ret, status = 0;
+       int ret, skip, status = 0;
        int nodenum;
 
        memset(&migrate, 0, sizeof(migrate));
@@ -3040,12 +2983,27 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
                    nodenum == new_master)
                        continue;
 
+               /* We could race exit domain. If exited, skip. */
+               spin_lock(&dlm->spinlock);
+               skip = (!test_bit(nodenum, dlm->domain_map));
+               spin_unlock(&dlm->spinlock);
+               if (skip) {
+                       clear_bit(nodenum, iter->node_map);
+                       continue;
+               }
+
                ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
                                         &migrate, sizeof(migrate), nodenum,
                                         &status);
-               if (ret < 0)
-                       mlog_errno(ret);
-               else if (status < 0) {
+               if (ret < 0) {
+                       mlog(0, "migrate_request returned %d!\n", ret);
+                       if (!dlm_is_host_down(ret)) {
+                               mlog(ML_ERROR, "unhandled error=%d!\n", ret);
+                               BUG();
+                       }
+                       clear_bit(nodenum, iter->node_map);
+                       ret = 0;
+               } else if (status < 0) {
                        mlog(0, "migrate request (node %u) returned %d!\n",
                             nodenum, status);
                        ret = status;