R: do_xxxx()
L: receive_xxxx_reply() <- R: send_xxxx_reply()
*/
-
+#include <linux/types.h>
#include "dlm_internal.h"
+#include <linux/dlm_device.h>
#include "memory.h"
#include "lowcomms.h"
#include "requestqueue.h"
#include "rcom.h"
#include "recover.h"
#include "lvb_table.h"
+#include "user.h"
#include "config.h"
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
{ -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
{ -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
};
-EXPORT_SYMBOL_GPL(dlm_lvb_operations);
#define modes_compat(gr, rq) \
__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
};
-static void dlm_print_lkb(struct dlm_lkb *lkb)
+void dlm_print_lkb(struct dlm_lkb *lkb)
{
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
" status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
r->res_recover_locks_count, r->res_name);
}
+void dlm_dump_rsb(struct dlm_rsb *r)
+{
+ struct dlm_lkb *lkb;
+
+ dlm_print_rsb(r);
+
+ printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
+ list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
+ printk(KERN_ERR "rsb lookup list\n");
+ list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb grant queue:\n");
+ list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb convert queue:\n");
+ list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb wait queue:\n");
+ list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+}
+
/* Threads cannot use the lockspace while it's being recovered */
static inline void lock_recovery(struct dlm_ls *ls)
{
if (lkb->lkb_flags & DLM_IFL_MSTCPY)
DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
- return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? TRUE : FALSE;
+ return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
}
static inline int middle_conversion(struct dlm_lkb *lkb)
{
if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
(lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
- return TRUE;
- return FALSE;
+ return 1;
+ return 0;
}
static inline int down_conversion(struct dlm_lkb *lkb)
r->res_ls = ls;
r->res_length = len;
memcpy(r->res_name, name, len);
- init_MUTEX(&r->res_sem);
+ mutex_init(&r->res_mutex);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
if (len == r->res_length && !memcmp(name, r->res_name, len))
goto found;
}
- return -ENOENT;
+ return -EBADR;
found:
if (r->res_nodeid && (flags & R_MASTER))
if (!error)
goto out;
- if (error == -ENOENT && !(flags & R_CREATE))
+ if (error == -EBADR && !(flags & R_CREATE))
goto out;
/* the rsb was found but wasn't a master copy */
{
int rv;
rv = kref_put(&r->res_ref, toss_rsb);
- DLM_ASSERT(!rv, dlm_print_rsb(r););
+ DLM_ASSERT(!rv, dlm_dump_rsb(r););
}
static void kill_rsb(struct kref *kref)
/* All work is done after the return from kref_put() so we
can release the write_lock before the remove and free. */
- DLM_ASSERT(list_empty(&r->res_lookup),);
- DLM_ASSERT(list_empty(&r->res_grantqueue),);
- DLM_ASSERT(list_empty(&r->res_convertqueue),);
- DLM_ASSERT(list_empty(&r->res_waitqueue),);
- DLM_ASSERT(list_empty(&r->res_root_list),);
- DLM_ASSERT(list_empty(&r->res_recover_list),);
+ DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
}
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
lkb->lkb_nodeid = -1;
lkb->lkb_grmode = DLM_LOCK_IV;
kref_init(&lkb->lkb_ref);
+ INIT_LIST_HEAD(&lkb->lkb_ownqueue);
get_random_bytes(&bucket, sizeof(bucket));
bucket &= (ls->ls_lkbtbl_size - 1);
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}
-static int put_lkb(struct dlm_lkb *lkb)
+/* __put_lkb() is used when an lkb may not have an rsb attached to
+ it so we need to provide the lockspace explicitly */
+
+static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
uint16_t bucket = lkb->lkb_id & 0xFFFF;
write_lock(&ls->ls_lkbtbl[bucket].lock);
/* for local/process lkbs, lvbptr points to caller's lksb */
if (lkb->lkb_lvbptr && is_master_copy(lkb))
free_lvb(lkb->lkb_lvbptr);
- if (lkb->lkb_range)
- free_range(lkb->lkb_range);
free_lkb(lkb);
return 1;
} else {
int dlm_put_lkb(struct dlm_lkb *lkb)
{
- return put_lkb(lkb);
+ struct dlm_ls *ls;
+
+ DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
+ DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
+
+ ls = lkb->lkb_resource->res_ls;
+ return __put_lkb(ls, lkb);
}
/* This is only called to add a reference when the code already holds
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- down(&ls->ls_waiters_sem);
+ mutex_lock(&ls->ls_waiters_mutex);
if (lkb->lkb_wait_type) {
log_print("add_to_waiters error %d", lkb->lkb_wait_type);
goto out;
kref_get(&lkb->lkb_ref);
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
out:
- up(&ls->ls_waiters_sem);
+ mutex_unlock(&ls->ls_waiters_mutex);
}
+/* We clear the RESEND flag because we might be taking an lkb off the waiters
+ list as part of process_requestqueue (e.g. a lookup that has an optimized
+ request reply on the requestqueue) between dlm_recover_waiters_pre() which
+ set RESEND and dlm_recover_waiters_post() */
+
static int _remove_from_waiters(struct dlm_lkb *lkb)
{
int error = 0;
goto out;
}
lkb->lkb_wait_type = 0;
+ lkb->lkb_flags &= ~DLM_IFL_RESEND;
list_del(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
out:
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- down(&ls->ls_waiters_sem);
+ mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb);
- up(&ls->ls_waiters_sem);
+ mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
int count = 0, found;
for (;;) {
- found = FALSE;
+ found = 0;
write_lock(&ls->ls_rsbtbl[b].lock);
list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
res_hashchain) {
if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.toss_secs * HZ))
+ dlm_config.ci_toss_secs * HZ))
continue;
- found = TRUE;
+ found = 1;
break;
}
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
return;
- b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+ b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
if (b == 1) {
int len = receive_extralen(ms);
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
lkb->lkb_rqmode = DLM_LOCK_IV;
switch (lkb->lkb_status) {
+ case DLM_LKSTS_GRANTED:
+ break;
case DLM_LKSTS_CONVERT:
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
break;
}
lkb->lkb_rqmode = DLM_LOCK_IV;
-
- if (lkb->lkb_range) {
- lkb->lkb_range[GR_RANGE_START] = lkb->lkb_range[RQ_RANGE_START];
- lkb->lkb_range[GR_RANGE_END] = lkb->lkb_range[RQ_RANGE_END];
- }
}
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
lkb_statequeue);
if (lkb->lkb_id == first->lkb_id)
- return TRUE;
-
- return FALSE;
-}
-
-/* Return 1 if the locks' ranges overlap. If the lkb has no range then it is
- assumed to cover 0-ffffffff.ffffffff */
-
-static inline int ranges_overlap(struct dlm_lkb *lkb1, struct dlm_lkb *lkb2)
-{
- if (!lkb1->lkb_range || !lkb2->lkb_range)
- return TRUE;
-
- if (lkb1->lkb_range[RQ_RANGE_END] < lkb2->lkb_range[GR_RANGE_START] ||
- lkb1->lkb_range[RQ_RANGE_START] > lkb2->lkb_range[GR_RANGE_END])
- return FALSE;
+ return 1;
- return TRUE;
+ return 0;
}
/* Check if the given lkb conflicts with another lkb on the queue. */
list_for_each_entry(this, head, lkb_statequeue) {
if (this == lkb)
continue;
- if (ranges_overlap(lkb, this) && !modes_compat(this, lkb))
- return TRUE;
+ if (!modes_compat(this, lkb))
+ return 1;
}
- return FALSE;
+ return 0;
}
/*
continue;
}
- if (!ranges_overlap(lkb, this))
- continue;
-
if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
- return TRUE;
+ return 1;
}
/* if lkb is on the convert queue and is preventing the first
if (self && self != first) {
if (!modes_compat(lkb, first) &&
!queue_conflict(&rsb->res_grantqueue, first))
- return TRUE;
+ return 1;
}
- return FALSE;
+ return 0;
}
/*
*/
if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
- return TRUE;
+ return 1;
/*
* A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
*/
if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
- return TRUE;
+ return 1;
/*
- * When using range locks the NOORDER flag is set to avoid the standard
- * vms rules on grant order.
+ * The NOORDER flag is set to avoid the standard vms rules on grant
+ * order.
*/
if (lkb->lkb_exflags & DLM_LKF_NOORDER)
- return TRUE;
+ return 1;
/*
* 6-3: Once in that queue [CONVERTING], a conversion request cannot be
*/
if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
- return TRUE;
+ return 1;
/*
* 6-4: By default, a new request is immediately granted only if all
if (now && !conv && list_empty(&r->res_convertqueue) &&
list_empty(&r->res_waitqueue))
- return TRUE;
+ return 1;
/*
* 6-4: Once a lock request is in the queue of ungranted new requests,
if (!now && !conv && list_empty(&r->res_convertqueue) &&
first_in_list(lkb, &r->res_waitqueue))
- return TRUE;
+ return 1;
out:
/*
lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
}
- return FALSE;
+ return 0;
}
/*
list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
demoted = is_demoted(lkb);
- if (can_be_granted(r, lkb, FALSE)) {
+ if (can_be_granted(r, lkb, 0)) {
grant_lock_pending(r, lkb);
grant_restart = 1;
} else {
struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
- if (can_be_granted(r, lkb, FALSE))
+ if (can_be_granted(r, lkb, 0))
grant_lock_pending(r, lkb);
else
high = max_t(int, lkb->lkb_rqmode, high);
struct dlm_lkb *lkb, *s;
int high = DLM_LOCK_IV;
- DLM_ASSERT(is_master(r), dlm_print_rsb(r););
+ DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
high = grant_pending_convert(r, high);
high = grant_pending_wait(r, high);
/*
* If there are locks left on the wait/convert queue then send blocking
* ASTs to granted locks based on the largest requested mode (high)
- * found above. This can generate spurious blocking ASTs for range
- * locks. FIXME: highbast < high comparison not valid for PR/CW.
+ * found above. FIXME: highbast < high comparison not valid for PR/CW.
*/
list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
list_for_each_entry(gr, head, lkb_statequeue) {
if (gr->lkb_bastaddr &&
gr->lkb_highbast < lkb->lkb_rqmode &&
- ranges_overlap(lkb, gr) && !modes_compat(gr, lkb)) {
+ !modes_compat(gr, lkb)) {
queue_bast(r, gr, lkb->lkb_rqmode);
gr->lkb_highbast = lkb->lkb_rqmode;
}
return 0;
}
- DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
+ DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
dir_nodeid = dlm_dir_nodeid(r);
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
int namelen, uint32_t parent_lkid, void *ast,
- void *astarg, void *bast, struct dlm_range *range,
- struct dlm_args *args)
+ void *astarg, void *bast, struct dlm_args *args)
{
int rv = -EINVAL;
args->bastaddr = bast;
args->mode = mode;
args->lksb = lksb;
- args->range = range;
rv = 0;
out:
return rv;
lkb->lkb_lksb = args->lksb;
lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
lkb->lkb_ownpid = (int) current->pid;
-
- rv = 0;
- if (!args->range)
- goto out;
-
- if (!lkb->lkb_range) {
- rv = -ENOMEM;
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- goto out;
- /* This is needed for conversions that contain ranges
- where the original lock didn't but it's harmless for
- new locks too. */
- lkb->lkb_range[GR_RANGE_START] = 0LL;
- lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
- }
-
- lkb->lkb_range[RQ_RANGE_START] = args->range->ra_start;
- lkb->lkb_range[RQ_RANGE_END] = args->range->ra_end;
- lkb->lkb_flags |= DLM_IFL_RANGE;
rv = 0;
out:
return rv;
{
int error = 0;
- if (can_be_granted(r, lkb, TRUE)) {
+ if (can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
/* changing an existing lock may allow others to be granted */
- if (can_be_granted(r, lkb, TRUE)) {
+ if (can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
grant_pending_locks(r);
return -DLM_EUNLOCK;
}
+/* FIXME: if revert_lock() finds that the lkb is granted, we should
+ skip the queue_cast(ECANCEL). It indicates that the request/convert
+ completed (and queued a normal ast) just before the cancel; we don't
+ want to clobber the sb_result for the normal ast with ECANCEL. */
+
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
revert_lock(r, lkb);
return error;
}
-/* change some property of an existing lkb, e.g. mode, range */
+/* change some property of an existing lkb, e.g. mode */
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
uint32_t parent_lkid,
void (*ast) (void *astarg),
void *astarg,
- void (*bast) (void *astarg, int mode),
- struct dlm_range *range)
+ void (*bast) (void *astarg, int mode))
{
struct dlm_ls *ls;
struct dlm_lkb *lkb;
goto out;
error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
- astarg, bast, range, &args);
+ astarg, bast, &args);
if (error)
goto out_put;
error = 0;
out_put:
if (convert || error)
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
if (error == -EAGAIN)
error = 0;
out:
if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
error = 0;
out_put:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
out:
unlock_recovery(ls);
dlm_put_lockspace(ls);
if (lkb->lkb_astaddr)
ms->m_asts |= AST_COMP;
- if (lkb->lkb_range) {
- ms->m_range[0] = lkb->lkb_range[RQ_RANGE_START];
- ms->m_range[1] = lkb->lkb_range[RQ_RANGE_END];
- }
+ /* compare with switch in create_message; send_remove() doesn't
+ use send_args() */
- if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
+ switch (ms->m_type) {
+ case DLM_MSG_REQUEST:
+ case DLM_MSG_LOOKUP:
memcpy(ms->m_extra, r->res_name, r->res_length);
-
- else if (lkb->lkb_lvbptr)
+ break;
+ case DLM_MSG_CONVERT:
+ case DLM_MSG_UNLOCK:
+ case DLM_MSG_REQUEST_REPLY:
+ case DLM_MSG_CONVERT_REPLY:
+ case DLM_MSG_GRANT:
+ if (!lkb->lkb_lvbptr)
+ break;
memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
-
+ break;
+ }
}
static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb);
r->res_ls->ls_stub_ms.m_result = 0;
+ r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
}
static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
{
lkb->lkb_exflags = ms->m_exflags;
+ lkb->lkb_sbflags = ms->m_sbflags;
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
(ms->m_flags & 0x0000FFFF);
}
return (ms->m_header.h_length - sizeof(struct dlm_message));
}
-static int receive_range(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
-{
- if (lkb->lkb_flags & DLM_IFL_RANGE) {
- if (!lkb->lkb_range)
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- return -ENOMEM;
- lkb->lkb_range[RQ_RANGE_START] = ms->m_range[0];
- lkb->lkb_range[RQ_RANGE_END] = ms->m_range[1];
- }
- return 0;
-}
-
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms)
{
DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
- if (receive_range(ls, lkb, ms))
- return -ENOMEM;
-
- if (receive_lvb(ls, lkb, ms))
- return -ENOMEM;
+ if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+ /* lkb was just created so there won't be an lvb yet */
+ lkb->lkb_lvbptr = allocate_lvb(ls);
+ if (!lkb->lkb_lvbptr)
+ return -ENOMEM;
+ }
return 0;
}
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
return -EBUSY;
- if (receive_range(ls, lkb, ms))
- return -ENOMEM;
- if (lkb->lkb_range) {
- lkb->lkb_range[GR_RANGE_START] = 0LL;
- lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
- }
-
if (receive_lvb(ls, lkb, ms))
return -ENOMEM;
lkb->lkb_flags |= DLM_IFL_MSTCPY;
error = receive_request_args(ls, lkb, ms);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto fail;
}
error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto fail;
}
if (error == -EINPROGRESS)
error = 0;
if (error)
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
- int error, reply = TRUE;
+ int error, reply = 1;
error = find_lkb(ls, ms->m_remid, &lkb);
if (error)
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
return;
fail:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
confirm_master(r, error);
break;
- case -ENOENT:
+ case -EBADR:
case -ENOTBLK:
/* find_rsb failed to find rsb or rsb wasn't master */
r->res_nodeid = -1;
unlock_rsb(r);
put_rsb(r);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
_receive_convert_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
_receive_unlock_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
_receive_cancel_reply(lkb, ms);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
out:
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
}
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_ls *ls;
- int error;
+ int error = 0;
if (!recovery)
dlm_message_in(ms);
while (1) {
if (dlm_locking_stopped(ls)) {
- if (!recovery)
- dlm_add_requestqueue(ls, nodeid, hd);
- error = -EINTR;
- goto out;
+ if (recovery) {
+ error = -EINTR;
+ goto out;
+ }
+ error = dlm_add_requestqueue(ls, nodeid, hd);
+ if (error == -EAGAIN)
+ continue;
+ else {
+ error = -EINTR;
+ goto out;
+ }
}
if (lock_recovery_try(ls))
out:
dlm_put_lockspace(ls);
dlm_astd_wake();
- return 0;
+ return error;
}
if (middle_conversion(lkb)) {
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -EINPROGRESS;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_convert_reply(lkb, &ls->ls_stub_ms);
{
struct dlm_lkb *lkb, *safe;
- down(&ls->ls_waiters_sem);
+ mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
break;
case DLM_MSG_CANCEL:
hold_lkb(lkb);
ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+ ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_remove_from_waiters(lkb);
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
break;
default:
log_error(ls, "invalid lkb wait_type %d",
lkb->lkb_wait_type);
}
+ schedule();
}
- up(&ls->ls_waiters_sem);
+ mutex_unlock(&ls->ls_waiters_mutex);
}
static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
struct dlm_lkb *lkb;
int rv = 0;
- down(&ls->ls_waiters_sem);
+ mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
if (lkb->lkb_flags & DLM_IFL_RESEND) {
rv = lkb->lkb_wait_type;
break;
}
}
- up(&ls->ls_waiters_sem);
+ mutex_unlock(&ls->ls_waiters_mutex);
if (!rv)
lkb = NULL;
hold_rsb(r);
lock_rsb(r);
_request_lock(r, lkb);
+ if (is_master(r))
+ confirm_master(r, 0);
unlock_rsb(r);
put_rsb(r);
break;
list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
if (test(ls, lkb)) {
+ rsb_set_flag(r, RSB_LOCKS_PURGED);
del_lkb(r, lkb);
/* this put should free the lkb */
- if (!put_lkb(lkb))
+ if (!dlm_put_lkb(lkb))
log_error(ls, "purged lkb not released");
}
}
return 0;
}
-int dlm_grant_after_purge(struct dlm_ls *ls)
+static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+{
+ struct dlm_rsb *r, *r_ret = NULL;
+
+ read_lock(&ls->ls_rsbtbl[bucket].lock);
+ list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+ if (!rsb_flag(r, RSB_LOCKS_PURGED))
+ continue;
+ hold_rsb(r);
+ rsb_clear_flag(r, RSB_LOCKS_PURGED);
+ r_ret = r;
+ break;
+ }
+ read_unlock(&ls->ls_rsbtbl[bucket].lock);
+ return r_ret;
+}
+
+void dlm_grant_after_purge(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- int i;
+ int bucket = 0;
- for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- read_lock(&ls->ls_rsbtbl[i].lock);
- list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
- hold_rsb(r);
- lock_rsb(r);
- if (is_master(r)) {
- grant_pending_locks(r);
- confirm_master(r, 0);
- }
- unlock_rsb(r);
- put_rsb(r);
+ while (1) {
+ r = find_purged_rsb(ls, bucket);
+ if (!r) {
+ if (bucket == ls->ls_rsbtbl_size - 1)
+ break;
+ bucket++;
+ continue;
}
- read_unlock(&ls->ls_rsbtbl[i].lock);
+ lock_rsb(r);
+ if (is_master(r)) {
+ grant_pending_locks(r);
+ confirm_master(r, 0);
+ }
+ unlock_rsb(r);
+ put_rsb(r);
+ schedule();
}
-
- return 0;
}
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
- if (lkb->lkb_flags & DLM_IFL_RANGE) {
- lkb->lkb_range = allocate_range(ls);
- if (!lkb->lkb_range)
- return -ENOMEM;
- memcpy(lkb->lkb_range, rl->rl_range, 4*sizeof(uint64_t));
- }
-
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
lkb->lkb_lvbptr = allocate_lvb(ls);
if (!lkb->lkb_lvbptr)
error = receive_rcom_lock_args(ls, lkb, r, rc);
if (error) {
- put_lkb(lkb);
+ __put_lkb(ls, lkb);
goto out_unlock;
}
lock_rsb(r);
switch (error) {
+ case -EBADR:
+ /* There's a chance the new master received our lock before
+ dlm_recover_master_reply(), this wouldn't happen if we did
+ a barrier between recover_masters and recover_locks. */
+ log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
+ (unsigned long)r, r->res_name);
+ dlm_send_rcom_lock(r, lkb);
+ goto out;
case -EEXIST:
log_debug(ls, "master copy exists %x", lkb->lkb_id);
/* fall through */
/* an ack for dlm_recover_locks() which waits for replies from
all the locks it sends to new masters */
dlm_recovered_lock(r);
-
+ out:
unlock_rsb(r);
put_rsb(r);
- put_lkb(lkb);
+ dlm_put_lkb(lkb);
+
+ return 0;
+}
+
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
+ int mode, uint32_t flags, void *name, unsigned int namelen,
+ uint32_t parent_lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ int error;
+
+ lock_recovery(ls);
+
+ error = create_lkb(ls, &lkb);
+ if (error) {
+ kfree(ua);
+ goto out;
+ }
+
+ if (flags & DLM_LKF_VALBLK) {
+ ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ kfree(ua);
+ __put_lkb(ls, lkb);
+ error = -ENOMEM;
+ goto out;
+ }
+ }
+ /* After ua is attached to lkb it will be freed by free_lkb().
+ When DLM_IFL_USER is set, the dlm knows that this is a userspace
+ lock and that lkb_astparam is the dlm_user_args structure. */
+
+ error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
+ DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+ lkb->lkb_flags |= DLM_IFL_USER;
+ ua->old_mode = DLM_LOCK_IV;
+
+ if (error) {
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ error = request_lock(ls, lkb, name, namelen, &args);
+
+ switch (error) {
+ case 0:
+ break;
+ case -EINPROGRESS:
+ error = 0;
+ break;
+ case -EAGAIN:
+ error = 0;
+ /* fall through */
+ default:
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ /* add this new lkb to the per-process list of locks */
+ spin_lock(&ua->proc->locks_spin);
+ kref_get(&lkb->lkb_ref);
+ list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+ spin_unlock(&ua->proc->locks_spin);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ /* user can change the params on its lock when it converts it, or
+ add an lvb that didn't exist before */
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
+ ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ error = -ENOMEM;
+ goto out_put;
+ }
+ }
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+
+ ua->castparam = ua_tmp->castparam;
+ ua->castaddr = ua_tmp->castaddr;
+ ua->bastparam = ua_tmp->bastparam;
+ ua->bastaddr = ua_tmp->bastaddr;
+ ua->user_lksb = ua_tmp->user_lksb;
+ ua->old_mode = lkb->lkb_grmode;
+
+ error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
+ ua, DLM_FAKE_USER_AST, &args);
+ if (error)
+ goto out_put;
+
+ error = convert_lock(ls, lkb, &args);
+
+ if (error == -EINPROGRESS || error == -EAGAIN)
+ error = 0;
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ kfree(ua_tmp);
+ return error;
+}
+
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+ ua->castparam = ua_tmp->castparam;
+ ua->user_lksb = ua_tmp->user_lksb;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = unlock_lock(ls, lkb, &args);
+
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ spin_lock(&ua->proc->locks_spin);
+ /* dlm_user_add_ast() may have already taken lkb off the proc list */
+ if (!list_empty(&lkb->lkb_ownqueue))
+ list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
+ spin_unlock(&ua->proc->locks_spin);
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ ua->castparam = ua_tmp->castparam;
+ ua->user_lksb = ua_tmp->user_lksb;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = cancel_lock(ls, lkb, &args);
+
+ if (error == -DLM_ECANCEL)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ /* this lkb was removed from the WAITING queue */
+ if (lkb->lkb_grmode == DLM_LOCK_IV) {
+ spin_lock(&ua->proc->locks_spin);
+ list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
+ spin_unlock(&ua->proc->locks_spin);
+ }
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (ua->lksb.sb_lvbptr)
+ kfree(ua->lksb.sb_lvbptr);
+ kfree(ua);
+ lkb->lkb_astparam = (long)NULL;
+
+ /* TODO: propogate to master if needed */
return 0;
}
+/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
+ Regardless of what rsb queue the lock is on, it's removed and freed. */
+
+static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ struct dlm_args args;
+ int error;
+
+ /* FIXME: we need to handle the case where the lkb is in limbo
+ while the rsb is being looked up, currently we assert in
+ _unlock_lock/is_remote because rsb nodeid is -1. */
+
+ set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+
+ error = unlock_lock(ls, lkb, &args);
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ return error;
+}
+
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+ 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
+ which we clear here. */
+
+/* proc CLOSING flag is set so no more device_reads should look at proc->asts
+ list, and no more device_writes should add lkb's to proc->locks list; so we
+ shouldn't need to take asts_spin or locks_spin here. this assumes that
+ device reads/writes/closes are serialized -- FIXME: we may need to serialize
+ them ourself. */
+
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+ struct dlm_lkb *lkb, *safe;
+
+ lock_recovery(ls);
+ mutex_lock(&ls->ls_clear_proc_locks);
+
+ list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
+ list_del_init(&lkb->lkb_ownqueue);
+
+ if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
+ lkb->lkb_flags |= DLM_IFL_ORPHAN;
+ orphan_proc_lock(ls, lkb);
+ } else {
+ lkb->lkb_flags |= DLM_IFL_DEAD;
+ unlock_proc_lock(ls, lkb);
+ }
+
+ /* this removes the reference for the proc->locks list
+ added by dlm_user_request, it may result in the lkb
+ being freed */
+
+ dlm_put_lkb(lkb);
+ }
+
+ /* in-progress unlocks */
+ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
+ list_del_init(&lkb->lkb_ownqueue);
+ lkb->lkb_flags |= DLM_IFL_DEAD;
+ dlm_put_lkb(lkb);
+ }
+
+ list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+ list_del(&lkb->lkb_astqueue);
+ dlm_put_lkb(lkb);
+ }
+
+ mutex_unlock(&ls->ls_clear_proc_locks);
+ unlock_recovery(ls);
+}
+