X-Git-Url: http://pilppa.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmmaster.c;h=77e4e6169a0df2d01c0adc0fb091038b4f53bc7b;hb=05488bbebe3deedbc5d58a1832f563ff96bc2ef6;hp=bd1268778b660b5b54e28589f33ada63e2445540;hpb=d74c9803a90d733f5fb7270475aa6d14b45796c6;p=linux-2.6-omap-h63xx.git diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index bd1268778b6..c92d1b19fc0 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -102,6 +102,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); static int dlm_do_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, void *nodemap, u32 flags); +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, @@ -191,25 +192,20 @@ static void dlm_print_one_mle(struct dlm_master_list_entry *mle) static void dlm_dump_mles(struct dlm_ctxt *dlm) { struct dlm_master_list_entry *mle; - struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); spin_lock(&dlm->master_lock); - list_for_each(iter, &dlm->master_list) { - mle = list_entry(iter, struct dlm_master_list_entry, list); + list_for_each_entry(mle, &dlm->master_list, list) dlm_print_one_mle(mle); - } spin_unlock(&dlm->master_lock); } int dlm_dump_all_mles(const char __user *data, unsigned int len) { - struct list_head *iter; struct dlm_ctxt *dlm; spin_lock(&dlm_domain_lock); - list_for_each(iter, &dlm_domains) { - dlm = list_entry (iter, struct dlm_ctxt, list); + list_for_each_entry(dlm, &dlm_domains, list) { mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name); dlm_dump_mles(dlm); } @@ -453,12 +449,10 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, char *name, unsigned int namelen) { struct dlm_master_list_entry *tmpmle; - struct list_head *iter; assert_spin_locked(&dlm->master_lock); - list_for_each(iter, &dlm->master_list) { - tmpmle = list_entry(iter, struct dlm_master_list_entry, list); + list_for_each_entry(tmpmle, &dlm->master_list, list) { if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) continue; dlm_get_mle(tmpmle); @@ -471,13 +465,10 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up) { struct dlm_master_list_entry *mle; - struct list_head *iter; assert_spin_locked(&dlm->spinlock); - list_for_each(iter, &dlm->mle_hb_events) { - mle = list_entry(iter, struct dlm_master_list_entry, - hb_events); + list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) { if (node_up) dlm_mle_node_up(dlm, mle, NULL, idx); else @@ -519,7 +510,7 @@ int dlm_init_mle_cache(void) dlm_mle_cache = kmem_cache_create("dlm_mle_cache", sizeof(struct dlm_master_list_entry), 0, SLAB_HWCACHE_ALIGN, - NULL, NULL); + NULL); if (dlm_mle_cache == NULL) return -ENOMEM; return 0; @@ -917,7 +908,7 @@ lookup: * but they might own this lockres. wait on them. */ bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" + mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " "recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; @@ -971,7 +962,7 @@ redo_request: spin_lock(&dlm->spinlock); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" + mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " "recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; @@ -1704,9 +1695,9 @@ send_response: * can periodically run all locks owned by this node * and re-assert across the cluster... */ -int dlm_do_assert_master(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - void *nodemap, u32 flags) +static int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags) { struct dlm_assert_master assert; int to, tmpret; @@ -1717,6 +1708,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm, unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + again: reassert = 0; @@ -1789,6 +1785,11 @@ again: if (reassert) goto again; + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + return ret; } @@ -2036,8 +2037,12 @@ ok: done: ret = 0; - if (res) - dlm_lockres_put(res); + if (res) { + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + *ret_data = (void *)res; + } dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); @@ -2064,11 +2069,25 @@ kill: __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); - dlm_lockres_put(res); + *ret_data = (void *)res; dlm_put(dlm); return -EINVAL; } +void dlm_assert_master_post_handler(int status, void *data, void *ret_data) +{ + struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data; + + if (ret_data) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + dlm_lockres_put(res); + } + return; +} + int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags) @@ -2278,6 +2297,9 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, int ret = -EINVAL; u8 node; unsigned int hash; + struct dlm_work_item *item; + int cleared = 0; + int dispatch = 0; if (!dlm_grab(dlm)) return 0; @@ -2308,27 +2330,140 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&dlm->spinlock); spin_lock(&res->spinlock); - BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); - if (test_bit(node, res->refmap)) { - ret = 0; - dlm_lockres_clear_refmap_bit(node, res); - } else { - mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " - "but it is already dropped!\n", dlm->name, namelen, - name, node); - __dlm_print_one_lock_resource(res); + if (res->state & DLM_LOCK_RES_SETREF_INPROG) + dispatch = 1; + else { + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } } spin_unlock(&res->spinlock); - if (!ret) - dlm_lockres_calc_usage(dlm, res); + if (!dispatch) { + if (cleared) + dlm_lockres_calc_usage(dlm, res); + else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + ret = 0; + goto done; + } + + item = kzalloc(sizeof(*item), GFP_NOFS); + if (!item) { + ret = -ENOMEM; + mlog_errno(ret); + goto done; + } + + dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL); + item->u.dl.deref_res = res; + item->u.dl.deref_node = node; + + spin_lock(&dlm->work_lock); + list_add_tail(&item->list, &dlm->work_list); + spin_unlock(&dlm->work_lock); + + queue_work(dlm->dlm_worker, &dlm->dispatched_work); + return 0; + done: if (res) dlm_lockres_put(res); dlm_put(dlm); + return ret; } +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) +{ + struct dlm_ctxt *dlm; + struct dlm_lock_resource *res; + u8 node; + u8 cleared = 0; + + dlm = item->dlm; + res = item->u.dl.deref_res; + node = item->u.dl.deref_node; + + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } + spin_unlock(&res->spinlock); + + if (cleared) { + mlog(0, "%s:%.*s node %u ref dropped in dispatch\n", + dlm->name, res->lockname.len, res->lockname.name, node); + dlm_lockres_calc_usage(dlm, res); + } else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + + dlm_lockres_put(res); +} + +/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0 + * if not. If 0, numlocks is set to the number of locks in the lockres. + */ +static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int *numlocks) +{ + int ret; + int i; + int count = 0; + struct list_head *queue; + struct dlm_lock *lock; + + assert_spin_locked(&res->spinlock); + + ret = -EINVAL; + if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { + mlog(0, "cannot migrate lockres with unknown owner!\n"); + goto leave; + } + + if (res->owner != dlm->node_num) { + mlog(0, "cannot migrate lockres this node doesn't own!\n"); + goto leave; + } + + ret = 0; + queue = &res->granted; + for (i = 0; i < 3; i++) { + list_for_each_entry(lock, queue, list) { + ++count; + if (lock->ml.node == dlm->node_num) { + mlog(0, "found a lock owned by this node still " + "on the %s queue! will not migrate this " + "lockres\n", (i == 0 ? "granted" : + (i == 1 ? "converting" : + "blocked"))); + ret = -ENOTEMPTY; + goto leave; + } + } + queue++; + } + + *numlocks = count; + mlog(0, "migrateable lockres having %d locks\n", *numlocks); + +leave: + return ret; +} /* * DLM_MIGRATE_LOCKRES @@ -2342,14 +2477,12 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; struct dlm_migratable_lockres *mres = NULL; - int ret = -EINVAL; + int ret = 0; const char *name; unsigned int namelen; int mle_added = 0; - struct list_head *queue, *iter; - int i; - struct dlm_lock *lock; - int empty = 1, wake = 0; + int numlocks; + int wake = 0; if (!dlm_grab(dlm)) return -EINVAL; @@ -2363,42 +2496,16 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, * ensure this lockres is a proper candidate for migration */ spin_lock(&res->spinlock); - if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { - mlog(0, "cannot migrate lockres with unknown owner!\n"); - spin_unlock(&res->spinlock); - goto leave; - } - if (res->owner != dlm->node_num) { - mlog(0, "cannot migrate lockres this node doesn't own!\n"); + ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); + if (ret < 0) { spin_unlock(&res->spinlock); goto leave; } - mlog(0, "checking queues...\n"); - queue = &res->granted; - for (i=0; i<3; i++) { - list_for_each(iter, queue) { - lock = list_entry (iter, struct dlm_lock, list); - empty = 0; - if (lock->ml.node == dlm->node_num) { - mlog(0, "found a lock owned by this node " - "still on the %s queue! will not " - "migrate this lockres\n", - i==0 ? "granted" : - (i==1 ? "converting" : "blocked")); - spin_unlock(&res->spinlock); - ret = -ENOTEMPTY; - goto leave; - } - } - queue++; - } - mlog(0, "all locks on this lockres are nonlocal. continuing\n"); spin_unlock(&res->spinlock); /* no work to do */ - if (empty) { + if (numlocks == 0) { mlog(0, "no locks were found on this lockres! done!\n"); - ret = 0; goto leave; } @@ -2634,16 +2741,27 @@ int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { int ret; int lock_dropped = 0; + int numlocks; + spin_lock(&res->spinlock); if (res->owner != dlm->node_num) { if (!__dlm_lockres_unused(res)) { mlog(ML_ERROR, "%s:%.*s: this node is not master, " "trying to free this but locks remain\n", dlm->name, res->lockname.len, res->lockname.name); } + spin_unlock(&res->spinlock); goto leave; } + /* No need to migrate a lockres having no locks */ + ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); + if (ret >= 0 && numlocks == 0) { + spin_unlock(&res->spinlock); + goto leave; + } + spin_unlock(&res->spinlock); + /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */ spin_unlock(&dlm->spinlock); lock_dropped = 1; @@ -2794,18 +2912,16 @@ again: static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - struct list_head *iter, *iter2; struct list_head *queue = &res->granted; int i, bit; - struct dlm_lock *lock; + struct dlm_lock *lock, *next; assert_spin_locked(&res->spinlock); BUG_ON(res->owner == dlm->node_num); for (i=0; i<3; i++) { - list_for_each_safe(iter, iter2, queue) { - lock = list_entry (iter, struct dlm_lock, list); + list_for_each_entry_safe(lock, next, queue, list) { if (lock->ml.node != dlm->node_num) { mlog(0, "putting lock for node %u\n", lock->ml.node); @@ -2847,7 +2963,6 @@ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, { int i; struct list_head *queue = &res->granted; - struct list_head *iter; struct dlm_lock *lock; int nodenum; @@ -2855,10 +2970,9 @@ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); for (i=0; i<3; i++) { - list_for_each(iter, queue) { + list_for_each_entry(lock, queue, list) { /* up to the caller to make sure this node * is alive */ - lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node != dlm->node_num) { spin_unlock(&res->spinlock); return lock->ml.node; @@ -3105,8 +3219,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) { - struct list_head *iter, *iter2; - struct dlm_master_list_entry *mle; + struct dlm_master_list_entry *mle, *next; struct dlm_lock_resource *res; unsigned int hash; @@ -3116,9 +3229,7 @@ top: /* clean the master list */ spin_lock(&dlm->master_lock); - list_for_each_safe(iter, iter2, &dlm->master_list) { - mle = list_entry(iter, struct dlm_master_list_entry, list); - + list_for_each_entry_safe(mle, next, &dlm->master_list, list) { BUG_ON(mle->type != DLM_MLE_BLOCK && mle->type != DLM_MLE_MASTER && mle->type != DLM_MLE_MIGRATION);