#include "dlmapi.h"
#include "dlmcommon.h"
#include "dlmdomain.h"
+#include "dlmdebug.h"
#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
#include "cluster/masklog.h"
-enum dlm_mle_type {
- DLM_MLE_BLOCK,
- DLM_MLE_MASTER,
- DLM_MLE_MIGRATION
-};
-
-struct dlm_lock_name
-{
- u8 len;
- u8 name[DLM_LOCKID_NAME_MAX];
-};
-
-struct dlm_master_list_entry
-{
- struct list_head list;
- struct list_head hb_events;
- struct dlm_ctxt *dlm;
- spinlock_t spinlock;
- wait_queue_head_t wq;
- atomic_t woken;
- struct kref mle_refs;
- int inuse;
- unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
- unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
- unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
- unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
- u8 master;
- u8 new_master;
- enum dlm_mle_type type;
- struct o2hb_callback_func mle_hb_up;
- struct o2hb_callback_func mle_hb_down;
- union {
- struct dlm_lock_resource *res;
- struct dlm_lock_name name;
- } u;
-};
-
static void dlm_mle_node_down(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
struct o2nm_node *node,
return 1;
}
-#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m)
-static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
-{
- int i;
- printk("%s=[ ", mapname);
- for (i=0; i<O2NM_MAX_NODES; i++)
- if (test_bit(i, map))
- printk("%d ", i);
- printk("]");
-}
-
-static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
-{
- int refs;
- char *type;
- char attached;
- u8 master;
- unsigned int namelen;
- const char *name;
- struct kref *k;
- unsigned long *maybe = mle->maybe_map,
- *vote = mle->vote_map,
- *resp = mle->response_map,
- *node = mle->node_map;
-
- k = &mle->mle_refs;
- if (mle->type == DLM_MLE_BLOCK)
- type = "BLK";
- else if (mle->type == DLM_MLE_MASTER)
- type = "MAS";
- else
- type = "MIG";
- refs = atomic_read(&k->refcount);
- master = mle->master;
- attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
-
- if (mle->type != DLM_MLE_MASTER) {
- namelen = mle->u.name.len;
- name = mle->u.name.name;
- } else {
- namelen = mle->u.res->lockname.len;
- name = mle->u.res->lockname.name;
- }
-
- mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
- namelen, name, type, refs, master, mle->new_master, attached,
- mle->inuse);
- dlm_print_nodemap(maybe);
- printk(", ");
- dlm_print_nodemap(vote);
- printk(", ");
- dlm_print_nodemap(resp);
- printk(", ");
- dlm_print_nodemap(node);
- printk(", ");
- printk("\n");
-}
-
-#if 0
-/* Code here is included but defined out as it aids debugging */
-
-static void dlm_dump_mles(struct dlm_ctxt *dlm)
-{
- struct dlm_master_list_entry *mle;
-
- mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
- spin_lock(&dlm->master_lock);
- list_for_each_entry(mle, &dlm->master_list, list)
- dlm_print_one_mle(mle);
- spin_unlock(&dlm->master_lock);
-}
-
-int dlm_dump_all_mles(const char __user *data, unsigned int len)
-{
- struct dlm_ctxt *dlm;
-
- spin_lock(&dlm_domain_lock);
- list_for_each_entry(dlm, &dlm_domains, list) {
- mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
- dlm_dump_mles(dlm);
- }
- spin_unlock(&dlm_domain_lock);
- return len;
-}
-EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
-
-#endif /* 0 */
-
-
+static struct kmem_cache *dlm_lockres_cache = NULL;
+static struct kmem_cache *dlm_lockname_cache = NULL;
static struct kmem_cache *dlm_mle_cache = NULL;
-
static void dlm_mle_release(struct kref *kref);
static void dlm_init_mle(struct dlm_master_list_entry *mle,
enum dlm_mle_type type,
int dlm_init_mle_cache(void)
{
- dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
+ dlm_mle_cache = kmem_cache_create("o2dlm_mle",
sizeof(struct dlm_master_list_entry),
0, SLAB_HWCACHE_ALIGN,
NULL);
* LOCK RESOURCE FUNCTIONS
*/
+int dlm_init_master_caches(void)
+{
+ dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
+ sizeof(struct dlm_lock_resource),
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (!dlm_lockres_cache)
+ goto bail;
+
+ dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
+ DLM_LOCKID_NAME_MAX, 0,
+ SLAB_HWCACHE_ALIGN, NULL);
+ if (!dlm_lockname_cache)
+ goto bail;
+
+ return 0;
+bail:
+ dlm_destroy_master_caches();
+ return -ENOMEM;
+}
+
+void dlm_destroy_master_caches(void)
+{
+ if (dlm_lockname_cache)
+ kmem_cache_destroy(dlm_lockname_cache);
+
+ if (dlm_lockres_cache)
+ kmem_cache_destroy(dlm_lockres_cache);
+}
+
static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 owner)
static void dlm_lockres_release(struct kref *kref)
{
struct dlm_lock_resource *res;
+ struct dlm_ctxt *dlm;
res = container_of(kref, struct dlm_lock_resource, refs);
+ dlm = res->dlm;
/* This should not happen -- all lockres' have a name
* associated with them at init time. */
mlog(0, "destroying lockres %.*s\n", res->lockname.len,
res->lockname.name);
+ spin_lock(&dlm->track_lock);
+ if (!list_empty(&res->tracking))
+ list_del_init(&res->tracking);
+ else {
+ mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
+ res->lockname.len, res->lockname.name);
+ dlm_print_one_lock_resource(res);
+ }
+ spin_unlock(&dlm->track_lock);
+
+ dlm_put(dlm);
+
if (!hlist_unhashed(&res->hash_node) ||
!list_empty(&res->granted) ||
!list_empty(&res->converting) ||
BUG_ON(!list_empty(&res->recovering));
BUG_ON(!list_empty(&res->purge));
- kfree(res->lockname.name);
+ kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
- kfree(res);
+ kmem_cache_free(dlm_lockres_cache, res);
}
void dlm_lockres_put(struct dlm_lock_resource *res)
INIT_LIST_HEAD(&res->dirty);
INIT_LIST_HEAD(&res->recovering);
INIT_LIST_HEAD(&res->purge);
+ INIT_LIST_HEAD(&res->tracking);
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
res->inflight_locks = 0;
+ /* put in dlm_lockres_release */
+ dlm_grab(dlm);
+ res->dlm = dlm;
+
kref_init(&res->refs);
/* just for consistency */
res->last_used = 0;
+ spin_lock(&dlm->spinlock);
+ list_add_tail(&res->tracking, &dlm->tracking_list);
+ spin_unlock(&dlm->spinlock);
+
memset(res->lvb, 0, DLM_LVB_LEN);
memset(res->refmap, 0, sizeof(res->refmap));
}
const char *name,
unsigned int namelen)
{
- struct dlm_lock_resource *res;
+ struct dlm_lock_resource *res = NULL;
- res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
+ res = (struct dlm_lock_resource *)
+ kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
if (!res)
- return NULL;
+ goto error;
- res->lockname.name = kmalloc(namelen, GFP_NOFS);
- if (!res->lockname.name) {
- kfree(res);
- return NULL;
- }
+ res->lockname.name = (char *)
+ kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
+ if (!res->lockname.name)
+ goto error;
dlm_init_lockres(dlm, res, name, namelen);
return res;
+
+error:
+ if (res && res->lockname.name)
+ kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
+
+ if (res)
+ kmem_cache_free(dlm_lockres_cache, res);
+ return NULL;
}
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
if (tmpres) {
int dropping_ref = 0;
+ spin_unlock(&dlm->spinlock);
+
spin_lock(&tmpres->spinlock);
+ /* We wait for the other thread that is mastering the resource */
+ if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ __dlm_wait_on_lockres(tmpres);
+ BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
+ }
+
if (tmpres->owner == dlm->node_num) {
BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
dlm_lockres_grab_inflight_ref(dlm, tmpres);
} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
dropping_ref = 1;
spin_unlock(&tmpres->spinlock);
- spin_unlock(&dlm->spinlock);
/* wait until done messaging the master, drop our ref to allow
* the lockres to be purged, start over. */
if (!mle) {
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
res->owner != assert->node_idx) {
- mlog(ML_ERROR, "assert_master from "
- "%u, but current owner is "
- "%u! (%.*s)\n",
- assert->node_idx, res->owner,
- namelen, name);
- goto kill;
+ mlog(ML_ERROR, "DIE! Mastery assert from %u, "
+ "but current owner is %u! (%.*s)\n",
+ assert->node_idx, res->owner, namelen,
+ name);
+ __dlm_print_one_lock_resource(res);
+ BUG();
}
} else if (mle->type != DLM_MLE_MIGRATION) {
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
struct dlm_node_iter *iter)
{
struct dlm_migrate_request migrate;
- int ret, status = 0;
+ int ret, skip, status = 0;
int nodenum;
memset(&migrate, 0, sizeof(migrate));
nodenum == new_master)
continue;
+ /* We could race exit domain. If exited, skip. */
+ spin_lock(&dlm->spinlock);
+ skip = (!test_bit(nodenum, dlm->domain_map));
+ spin_unlock(&dlm->spinlock);
+ if (skip) {
+ clear_bit(nodenum, iter->node_map);
+ continue;
+ }
+
ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
&migrate, sizeof(migrate), nodenum,
&status);
- if (ret < 0)
- mlog_errno(ret);
- else if (status < 0) {
+ if (ret < 0) {
+ mlog(0, "migrate_request returned %d!\n", ret);
+ if (!dlm_is_host_down(ret)) {
+ mlog(ML_ERROR, "unhandled error=%d!\n", ret);
+ BUG();
+ }
+ clear_bit(nodenum, iter->node_map);
+ ret = 0;
+ } else if (status < 0) {
mlog(0, "migrate request (node %u) returned %d!\n",
nodenum, status);
ret = status;