]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - fs/dlm/lock.c
[MTD] jedec probe: drop unnecessary forward declarations
[linux-2.6-omap-h63xx.git] / fs / dlm / lock.c
index b455919c19984ad408d4ca498ad72f40a7d33d1f..3915b8e1414623db82bce20d0d4b4c41fc6b9fa3 100644 (file)
@@ -1670,9 +1670,10 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
    with a deadlk here, we'd have to generate something like grant_lock with
    the deadlk error.) */
 
-/* returns the highest requested mode of all blocked conversions */
+/* Returns the highest requested mode of all blocked conversions; sets
+   cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-static int grant_pending_convert(struct dlm_rsb *r, int high)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
 {
        struct dlm_lkb *lkb, *s;
        int hi, demoted, quit, grant_restart, demote_restart;
@@ -1709,6 +1710,9 @@ static int grant_pending_convert(struct dlm_rsb *r, int high)
                }
 
                hi = max_t(int, lkb->lkb_rqmode, hi);
+
+               if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
+                       *cw = 1;
        }
 
        if (grant_restart)
@@ -1721,29 +1725,52 @@ static int grant_pending_convert(struct dlm_rsb *r, int high)
        return max_t(int, high, hi);
 }
 
-static int grant_pending_wait(struct dlm_rsb *r, int high)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
 {
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
                if (can_be_granted(r, lkb, 0, NULL))
                        grant_lock_pending(r, lkb);
-                else
+                else {
                        high = max_t(int, lkb->lkb_rqmode, high);
+                       if (lkb->lkb_rqmode == DLM_LOCK_CW)
+                               *cw = 1;
+               }
        }
 
        return high;
 }
 
+/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
+   on either the convert or waiting queue.
+   high is the largest rqmode of all locks blocked on the convert or
+   waiting queue. */
+
+static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
+{
+       if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
+               if (gr->lkb_highbast < DLM_LOCK_EX)
+                       return 1;
+               return 0;
+       }
+
+       if (gr->lkb_highbast < high &&
+           !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
+               return 1;
+       return 0;
+}
+
 static void grant_pending_locks(struct dlm_rsb *r)
 {
        struct dlm_lkb *lkb, *s;
        int high = DLM_LOCK_IV;
+       int cw = 0;
 
        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
 
-       high = grant_pending_convert(r, high);
-       high = grant_pending_wait(r, high);
+       high = grant_pending_convert(r, high, &cw);
+       high = grant_pending_wait(r, high, &cw);
 
        if (high == DLM_LOCK_IV)
                return;
@@ -1751,27 +1778,41 @@ static void grant_pending_locks(struct dlm_rsb *r)
        /*
         * If there are locks left on the wait/convert queue then send blocking
         * ASTs to granted locks based on the largest requested mode (high)
-        * found above. FIXME: highbast < high comparison not valid for PR/CW.
+        * found above.
         */
 
        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
-               if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
-                   !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
-                       queue_bast(r, lkb, high);
+               if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) {
+                       if (cw && high == DLM_LOCK_PR)
+                               queue_bast(r, lkb, DLM_LOCK_CW);
+                       else
+                               queue_bast(r, lkb, high);
                        lkb->lkb_highbast = high;
                }
        }
 }
 
+static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
+{
+       if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
+           (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
+               if (gr->lkb_highbast < DLM_LOCK_EX)
+                       return 1;
+               return 0;
+       }
+
+       if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
+               return 1;
+       return 0;
+}
+
 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
                            struct dlm_lkb *lkb)
 {
        struct dlm_lkb *gr;
 
        list_for_each_entry(gr, head, lkb_statequeue) {
-               if (gr->lkb_bastaddr &&
-                   gr->lkb_highbast < lkb->lkb_rqmode &&
-                   !modes_compat(gr, lkb)) {
+               if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) {
                        queue_bast(r, gr, lkb->lkb_rqmode);
                        gr->lkb_highbast = lkb->lkb_rqmode;
                }
@@ -2235,7 +2276,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
           before we try again to grant this one. */
 
        if (is_demoted(lkb)) {
-               grant_pending_convert(r, DLM_LOCK_IV);
+               grant_pending_convert(r, DLM_LOCK_IV, NULL);
                if (_can_be_granted(r, lkb, 1)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
@@ -3597,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
        dlm_put_lkb(lkb);
 }
 
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
-       struct dlm_message *ms = (struct dlm_message *) hd;
-       struct dlm_ls *ls;
-       int error = 0;
-
-       if (!recovery)
-               dlm_message_in(ms);
-
-       ls = dlm_find_lockspace_global(hd->h_lockspace);
-       if (!ls) {
-               log_print("drop message %d from %d for unknown lockspace %d",
-                         ms->m_type, nodeid, hd->h_lockspace);
-               return -EINVAL;
-       }
-
-       /* recovery may have just ended leaving a bunch of backed-up requests
-          in the requestqueue; wait while dlm_recoverd clears them */
-
-       if (!recovery)
-               dlm_wait_requestqueue(ls);
-
-       /* recovery may have just started while there were a bunch of
-          in-flight requests -- save them in requestqueue to be processed
-          after recovery.  we can't let dlm_recvd block on the recovery
-          lock.  if dlm_recoverd is calling this function to clear the
-          requestqueue, it needs to be interrupted (-EINTR) if another
-          recovery operation is starting. */
-
-       while (1) {
-               if (dlm_locking_stopped(ls)) {
-                       if (recovery) {
-                               error = -EINTR;
-                               goto out;
-                       }
-                       error = dlm_add_requestqueue(ls, nodeid, hd);
-                       if (error == -EAGAIN)
-                               continue;
-                       else {
-                               error = -EINTR;
-                               goto out;
-                       }
-               }
-
-               if (dlm_lock_recovery_try(ls))
-                       break;
-               schedule();
-       }
-
        switch (ms->m_type) {
 
        /* messages sent to a master node */
@@ -3720,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
                log_error(ls, "unknown message type %d", ms->m_type);
        }
 
-       dlm_unlock_recovery(ls);
- out:
-       dlm_put_lockspace(ls);
        dlm_astd_wake();
-       return error;
 }
 
+/* If the lockspace is in recovery mode (locking stopped), then normal
+   messages are saved on the requestqueue for processing after recovery is
+   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
+   messages off the requestqueue before we process new ones. This occurs right
+   after recovery completes when we transition from saving all messages on
+   requestqueue, to processing all the saved messages, to processing new
+   messages as they arrive. */
 
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+                               int nodeid)
+{
+       if (dlm_locking_stopped(ls)) {
+               dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+       } else {
+               dlm_wait_requestqueue(ls);
+               _receive_message(ls, ms);
+       }
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+   the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+       _receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+   the lockspace.  It could be either a MSG (normal message sent as part of
+   standard locking activity) or an RCOM (recovery message sent as part of
+   lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+       struct dlm_message *ms = (struct dlm_message *) hd;
+       struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+       struct dlm_ls *ls;
+       int type = 0;
+
+       switch (hd->h_cmd) {
+       case DLM_MSG:
+               dlm_message_in(ms);
+               type = ms->m_type;
+               break;
+       case DLM_RCOM:
+               dlm_rcom_in(rc);
+               type = rc->rc_type;
+               break;
+       default:
+               log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+               return;
+       }
+
+       if (hd->h_nodeid != nodeid) {
+               log_print("invalid h_nodeid %d from %d lockspace %x",
+                         hd->h_nodeid, nodeid, hd->h_lockspace);
+               return;
+       }
+
+       ls = dlm_find_lockspace_global(hd->h_lockspace);
+       if (!ls) {
+               log_print("invalid h_lockspace %x from %d cmd %d type %d",
+                         hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+               if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+                       dlm_send_ls_not_ready(nodeid, rc);
+               return;
+       }
+
+       /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+          be inactive (in this ls) before transitioning to recovery mode */
+
+       down_read(&ls->ls_recv_active);
+       if (hd->h_cmd == DLM_MSG)
+               dlm_receive_message(ls, ms, nodeid);
+       else
+               dlm_receive_rcom(ls, rc, nodeid);
+       up_read(&ls->ls_recv_active);
+
+       dlm_put_lockspace(ls);
+}
 
 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
@@ -4388,7 +4455,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 
        if (lvb_in && ua->lksb.sb_lvbptr)
                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
-       ua->castparam = ua_tmp->castparam;
+       if (ua_tmp->castparam)
+               ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
 
        error = set_unlock_args(flags, ua, &args);
@@ -4433,7 +4501,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
                goto out;
 
        ua = (struct dlm_user_args *)lkb->lkb_astparam;
-       ua->castparam = ua_tmp->castparam;
+       if (ua_tmp->castparam)
+               ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
 
        error = set_unlock_args(flags, ua, &args);