]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - net/sunrpc/xprtrdma/svc_rdma_transport.c
svcrdma: Free context on ib_post_recv error
[linux-2.6-omap-h63xx.git] / net / sunrpc / xprtrdma / svc_rdma_transport.c
index f09444c451bc232540baf771fa69499531ffca35..e85ac77f4954860a5bbe5c5f8b5652ea2f31fe0e 100644 (file)
@@ -54,7 +54,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
                                        int flags);
 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
 static void svc_rdma_release_rqst(struct svc_rqst *);
-static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
 static void dto_tasklet_func(unsigned long data);
 static void svc_rdma_detach(struct svc_xprt *xprt);
 static void svc_rdma_free(struct svc_xprt *xprt);
@@ -229,24 +228,10 @@ static void dto_tasklet_func(unsigned long data)
                list_del_init(&xprt->sc_dto_q);
                spin_unlock_irqrestore(&dto_lock, flags);
 
-               if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
-                       ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-                       rq_cq_reap(xprt);
-                       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-                       /*
-                        * If data arrived before established event,
-                        * don't enqueue. This defers RPC I/O until the
-                        * RDMA connection is complete.
-                        */
-                       if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-                               svc_xprt_enqueue(&xprt->sc_xprt);
-               }
-
-               if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
-                       ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-                       sq_cq_reap(xprt);
-               }
+               rq_cq_reap(xprt);
+               sq_cq_reap(xprt);
 
+               svc_xprt_put(&xprt->sc_xprt);
                spin_lock_irqsave(&dto_lock, flags);
        }
        spin_unlock_irqrestore(&dto_lock, flags);
@@ -275,8 +260,10 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
         * add it
         */
        spin_lock_irqsave(&dto_lock, flags);
-       if (list_empty(&xprt->sc_dto_q))
+       if (list_empty(&xprt->sc_dto_q)) {
+               svc_xprt_get(&xprt->sc_xprt);
                list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+       }
        spin_unlock_irqrestore(&dto_lock, flags);
 
        /* Tasklet does all the work to avoid irqsave locks. */
@@ -295,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
        struct ib_wc wc;
        struct svc_rdma_op_ctxt *ctxt = NULL;
 
+       if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+               return;
+
+       ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
        atomic_inc(&rdma_stat_rq_poll);
 
        spin_lock_bh(&xprt->sc_rq_dto_lock);
@@ -314,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
 
        if (ctxt)
                atomic_inc(&rdma_stat_rq_prod);
+
+       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+       /*
+        * If data arrived before established event,
+        * don't enqueue. This defers RPC I/O until the
+        * RDMA connection is complete.
+        */
+       if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+               svc_xprt_enqueue(&xprt->sc_xprt);
 }
 
 /*
@@ -326,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
        struct ib_cq *cq = xprt->sc_sq_cq;
        int ret;
 
+
+       if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+               return;
+
+       ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
        atomic_inc(&rdma_stat_sq_poll);
        while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
                ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -386,8 +391,10 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
         * add it
         */
        spin_lock_irqsave(&dto_lock, flags);
-       if (list_empty(&xprt->sc_dto_q))
+       if (list_empty(&xprt->sc_dto_q)) {
+               svc_xprt_get(&xprt->sc_xprt);
                list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+       }
        spin_unlock_irqrestore(&dto_lock, flags);
 
        /* Tasklet does all the work to avoid irqsave locks. */
@@ -517,6 +524,8 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
        recv_wr.wr_id = (u64)(unsigned long)ctxt;
 
        ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+       if (ret)
+               svc_rdma_put_context(ctxt, 1);
        return ret;
 }
 
@@ -611,6 +620,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
        switch (event->event) {
        case RDMA_CM_EVENT_ESTABLISHED:
                /* Accept complete */
+               svc_xprt_get(xprt);
                dprintk("svcrdma: Connection completed on DTO xprt=%p, "
                        "cm_id=%p\n", xprt, cma_id);
                clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
@@ -622,6 +632,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
                if (xprt) {
                        set_bit(XPT_CLOSE, &xprt->xpt_flags);
                        svc_xprt_enqueue(xprt);
+                       svc_xprt_put(xprt);
                }
                break;
        case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -661,15 +672,15 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 
        listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
        if (IS_ERR(listen_id)) {
-               rdma_destroy_xprt(cma_xprt);
+               svc_xprt_put(&cma_xprt->sc_xprt);
                dprintk("svcrdma: rdma_create_id failed = %ld\n",
                        PTR_ERR(listen_id));
                return (void *)listen_id;
        }
        ret = rdma_bind_addr(listen_id, sa);
        if (ret) {
-               rdma_destroy_xprt(cma_xprt);
                rdma_destroy_id(listen_id);
+               svc_xprt_put(&cma_xprt->sc_xprt);
                dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
                return ERR_PTR(ret);
        }
@@ -678,8 +689,9 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
        ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
        if (ret) {
                rdma_destroy_id(listen_id);
-               rdma_destroy_xprt(cma_xprt);
+               svc_xprt_put(&cma_xprt->sc_xprt);
                dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+               return ERR_PTR(ret);
        }
 
        /*
@@ -820,6 +832,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
                newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
        }
+       svc_xprt_get(&newxprt->sc_xprt);
        newxprt->sc_qp = newxprt->sc_cm_id->qp;
 
        /* Register all of physical memory */
@@ -891,82 +904,76 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 
  errout:
        dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+       /* Take a reference in case the DTO handler runs */
+       svc_xprt_get(&newxprt->sc_xprt);
+       if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) {
+               ib_destroy_qp(newxprt->sc_qp);
+               svc_xprt_put(&newxprt->sc_xprt);
+       }
        rdma_destroy_id(newxprt->sc_cm_id);
-       rdma_destroy_xprt(newxprt);
+       /* This call to put will destroy the transport */
+       svc_xprt_put(&newxprt->sc_xprt);
        return NULL;
 }
 
-/*
- * Post an RQ WQE to the RQ when the rqst is being released. This
- * effectively returns an RQ credit to the client. The rq_xprt_ctxt
- * will be null if the request is deferred due to an RDMA_READ or the
- * transport had no data ready (EAGAIN). Note that an RPC deferred in
- * svc_process will still return the credit, this is because the data
- * is copied and no longer consume a WQE/WC.
- */
 static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 {
-       int err;
-       struct svcxprt_rdma *rdma =
-               container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
-       if (rqstp->rq_xprt_ctxt) {
-               BUG_ON(rqstp->rq_xprt_ctxt != rdma);
-               err = svc_rdma_post_recv(rdma);
-               if (err)
-                       dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
-                               err);
-       }
-       rqstp->rq_xprt_ctxt = NULL;
 }
 
-/* Disable data ready events for this connection */
+/*
+ * When connected, an svc_xprt has at least three references:
+ *
+ * - A reference held by the QP. We still hold that here because this
+ *   code deletes the QP and puts the reference.
+ *
+ * - A reference held by the cm_id between the ESTABLISHED and
+ *   DISCONNECTED events. If the remote peer disconnected first, this
+ *   reference could be gone.
+ *
+ * - A reference held by the svc_recv code that called this function
+ *   as part of close processing.
+ *
+ * At a minimum two references should still be held.
+ */
 static void svc_rdma_detach(struct svc_xprt *xprt)
 {
        struct svcxprt_rdma *rdma =
                container_of(xprt, struct svcxprt_rdma, sc_xprt);
-       unsigned long flags;
-
        dprintk("svc: svc_rdma_detach(%p)\n", xprt);
-       /*
-        * Shutdown the connection. This will ensure we don't get any
-        * more events from the provider.
-        */
+
+       /* Disconnect and flush posted WQE */
        rdma_disconnect(rdma->sc_cm_id);
-       rdma_destroy_id(rdma->sc_cm_id);
 
-       /* We may already be on the DTO list */
-       spin_lock_irqsave(&dto_lock, flags);
-       if (!list_empty(&rdma->sc_dto_q))
-               list_del_init(&rdma->sc_dto_q);
-       spin_unlock_irqrestore(&dto_lock, flags);
+       /* Destroy the QP if present (not a listener) */
+       if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
+               ib_destroy_qp(rdma->sc_qp);
+               svc_xprt_put(xprt);
+       }
+
+       /* Destroy the CM ID */
+       rdma_destroy_id(rdma->sc_cm_id);
 }
 
 static void svc_rdma_free(struct svc_xprt *xprt)
 {
        struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
        dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
-       rdma_destroy_xprt(rdma);
-       kfree(rdma);
-}
+       /* We should only be called from kref_put */
+       BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0);
+       if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
+               ib_destroy_cq(rdma->sc_sq_cq);
 
-static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
-{
-       if (xprt->sc_qp && !IS_ERR(xprt->sc_qp))
-               ib_destroy_qp(xprt->sc_qp);
+       if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
+               ib_destroy_cq(rdma->sc_rq_cq);
 
-       if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq))
-               ib_destroy_cq(xprt->sc_sq_cq);
+       if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))
+               ib_dereg_mr(rdma->sc_phys_mr);
 
-       if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq))
-               ib_destroy_cq(xprt->sc_rq_cq);
+       if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
+               ib_dealloc_pd(rdma->sc_pd);
 
-       if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr))
-               ib_dereg_mr(xprt->sc_phys_mr);
-
-       if (xprt->sc_pd && !IS_ERR(xprt->sc_pd))
-               ib_dealloc_pd(xprt->sc_pd);
-
-       destroy_context_cache(xprt->sc_ctxt_head);
+       destroy_context_cache(rdma->sc_ctxt_head);
+       kfree(rdma);
 }
 
 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
@@ -998,7 +1005,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
        int ret;
 
        if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-               return 0;
+               return -ENOTCONN;
 
        BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
        BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
@@ -1009,13 +1016,16 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
                if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
                        spin_unlock_bh(&xprt->sc_lock);
                        atomic_inc(&rdma_stat_sq_starve);
-                       /* See if we can reap some SQ WR */
+
+                       /* See if we can opportunistically reap SQ WR to make room */
                        sq_cq_reap(xprt);
 
                        /* Wait until SQ WR available if SQ still full */
                        wait_event(xprt->sc_send_wait,
                                   atomic_read(&xprt->sc_sq_count) <
                                   xprt->sc_sq_depth);
+                       if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+                               return 0;
                        continue;
                }
                /* Bumped used SQ WR count and post */