]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - net/sunrpc/xprt.c
Merge master.kernel.org:/pub/scm/linux/kernel/git/gregkh/i2c-2.6
[linux-2.6-omap-h63xx.git] / net / sunrpc / xprt.c
index 1ac2fbe05102a59e1e7f077ef0f203a3c0c943e3..8ff2c8acb22316b00f3bedfb6fc0671276fbdcec 100644 (file)
@@ -62,16 +62,85 @@ static inline void  do_xprt_reserve(struct rpc_task *);
 static void    xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
-static int     xprt_clear_backlog(struct rpc_xprt *xprt);
+/*
+ * The transport code maintains an estimate on the maximum number of out-
+ * standing RPC requests, using a smoothed version of the congestion
+ * avoidance implemented in 44BSD. This is basically the Van Jacobson
+ * congestion algorithm: If a retransmit occurs, the congestion window is
+ * halved; otherwise, it is incremented by 1/cwnd when
+ *
+ *     -       a reply is received and
+ *     -       a full number of requests are outstanding and
+ *     -       the congestion window hasn't been updated recently.
+ */
+#define RPC_CWNDSHIFT          (8U)
+#define RPC_CWNDSCALE          (1U << RPC_CWNDSHIFT)
+#define RPC_INITCWND           RPC_CWNDSCALE
+#define RPC_MAXCWND(xprt)      ((xprt)->max_reqs << RPC_CWNDSHIFT)
+
+#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
+
+/**
+ * xprt_reserve_xprt - serialize write access to transports
+ * @task: task that is requesting access to the transport
+ *
+ * This prevents mixing the payload of separate requests, and prevents
+ * transport connects from colliding with writes.  No congestion control
+ * is provided.
+ */
+int xprt_reserve_xprt(struct rpc_task *task)
+{
+       struct rpc_xprt *xprt = task->tk_xprt;
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
+               if (task == xprt->snd_task)
+                       return 1;
+               if (task == NULL)
+                       return 0;
+               goto out_sleep;
+       }
+       xprt->snd_task = task;
+       if (req) {
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
+       }
+       return 1;
+
+out_sleep:
+       dprintk("RPC: %4d failed to lock transport %p\n",
+                       task->tk_pid, xprt);
+       task->tk_timeout = 0;
+       task->tk_status = -EAGAIN;
+       if (req && req->rq_ntrans)
+               rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+       else
+               rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+       return 0;
+}
+
+static void xprt_clear_locked(struct rpc_xprt *xprt)
+{
+       xprt->snd_task = NULL;
+       if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
+               smp_mb__before_clear_bit();
+               clear_bit(XPRT_LOCKED, &xprt->state);
+               smp_mb__after_clear_bit();
+       } else
+               schedule_work(&xprt->task_cleanup);
+}
 
 /*
- * Serialize write access to transports, in order to prevent different
- * requests from interfering with each other.
- * Also prevents transport connects from colliding with writes.
+ * xprt_reserve_xprt_cong - serialize write access to transports
+ * @task: task that is requesting access to the transport
+ *
+ * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
+ * integrated into the decision of whether a request is allowed to be
+ * woken up and given access to the transport.
  */
-static int
-__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_task *task)
 {
+       struct rpc_xprt *xprt = task->tk_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
@@ -79,7 +148,7 @@ __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
                        return 1;
                goto out_sleep;
        }
-       if (xprt->nocong || __xprt_get_cong(xprt, task)) {
+       if (__xprt_get_cong(xprt, task)) {
                xprt->snd_task = task;
                if (req) {
                        req->rq_bytes_sent = 0;
@@ -87,9 +156,7 @@ __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
                }
                return 1;
        }
-       smp_mb__before_clear_bit();
-       clear_bit(XPRT_LOCKED, &xprt->state);
-       smp_mb__after_clear_bit();
+       xprt_clear_locked(xprt);
 out_sleep:
        dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt);
        task->tk_timeout = 0;
@@ -101,26 +168,50 @@ out_sleep:
        return 0;
 }
 
-static inline int
-xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        int retval;
 
        spin_lock_bh(&xprt->transport_lock);
-       retval = __xprt_lock_write(xprt, task);
+       retval = xprt->ops->reserve_xprt(task);
        spin_unlock_bh(&xprt->transport_lock);
        return retval;
 }
 
+static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+{
+       struct rpc_task *task;
+       struct rpc_rqst *req;
 
-static void
-__xprt_lock_write_next(struct rpc_xprt *xprt)
+       if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
+               return;
+
+       task = rpc_wake_up_next(&xprt->resend);
+       if (!task) {
+               task = rpc_wake_up_next(&xprt->sending);
+               if (!task)
+                       goto out_unlock;
+       }
+
+       req = task->tk_rqstp;
+       xprt->snd_task = task;
+       if (req) {
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
+       }
+       return;
+
+out_unlock:
+       xprt_clear_locked(xprt);
+}
+
+static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
        struct rpc_task *task;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
-       if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
+       if (RPCXPRT_CONGESTED(xprt))
                goto out_unlock;
        task = rpc_wake_up_next(&xprt->resend);
        if (!task) {
@@ -128,7 +219,7 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
                if (!task)
                        goto out_unlock;
        }
-       if (xprt->nocong || __xprt_get_cong(xprt, task)) {
+       if (__xprt_get_cong(xprt, task)) {
                struct rpc_rqst *req = task->tk_rqstp;
                xprt->snd_task = task;
                if (req) {
@@ -138,31 +229,44 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
                return;
        }
 out_unlock:
-       smp_mb__before_clear_bit();
-       clear_bit(XPRT_LOCKED, &xprt->state);
-       smp_mb__after_clear_bit();
+       xprt_clear_locked(xprt);
 }
 
-/*
- * Releases the transport for use by other requests.
+/**
+ * xprt_release_xprt - allow other requests to use a transport
+ * @xprt: transport with other tasks potentially waiting
+ * @task: task that is releasing access to the transport
+ *
+ * Note that "task" can be NULL.  No congestion control is provided.
  */
-static void
-__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
-               xprt->snd_task = NULL;
-               smp_mb__before_clear_bit();
-               clear_bit(XPRT_LOCKED, &xprt->state);
-               smp_mb__after_clear_bit();
+               xprt_clear_locked(xprt);
                __xprt_lock_write_next(xprt);
        }
 }
 
-static inline void
-xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+/**
+ * xprt_release_xprt_cong - allow other requests to use a transport
+ * @xprt: transport with other tasks potentially waiting
+ * @task: task that is releasing access to the transport
+ *
+ * Note that "task" can be NULL.  Another task is awoken to use the
+ * transport if the transport's congestion window allows it.
+ */
+void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+       if (xprt->snd_task == task) {
+               xprt_clear_locked(xprt);
+               __xprt_lock_write_next_cong(xprt);
+       }
+}
+
+static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        spin_lock_bh(&xprt->transport_lock);
-       __xprt_release_write(xprt, task);
+       xprt->ops->release_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
 }
 
@@ -197,26 +301,40 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
                return;
        req->rq_cong = 0;
        xprt->cong -= RPC_CWNDSCALE;
-       __xprt_lock_write_next(xprt);
+       __xprt_lock_write_next_cong(xprt);
 }
 
-/*
- * Adjust RPC congestion window
+/**
+ * xprt_release_rqst_cong - housekeeping when request is complete
+ * @task: RPC request that recently completed
+ *
+ * Useful for transports that require congestion control.
+ */
+void xprt_release_rqst_cong(struct rpc_task *task)
+{
+       __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
+}
+
+/**
+ * xprt_adjust_cwnd - adjust transport congestion window
+ * @task: recently completed RPC request used to adjust window
+ * @result: result code of completed RPC request
+ *
  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  */
-static void
-xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
+void xprt_adjust_cwnd(struct rpc_task *task, int result)
 {
-       unsigned long   cwnd;
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = task->tk_xprt;
+       unsigned long cwnd = xprt->cwnd;
 
-       cwnd = xprt->cwnd;
        if (result >= 0 && cwnd <= xprt->cong) {
                /* The (cwnd >> 1) term makes sure
                 * the result gets rounded properly. */
                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
                if (cwnd > RPC_MAXCWND(xprt))
                        cwnd = RPC_MAXCWND(xprt);
-               __xprt_lock_write_next(xprt);
+               __xprt_lock_write_next_cong(xprt);
        } else if (result == -ETIMEDOUT) {
                cwnd >>= 1;
                if (cwnd < RPC_CWNDSCALE)
@@ -225,6 +343,7 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
        dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
                        xprt->cong, xprt->cwnd, cwnd);
        xprt->cwnd = cwnd;
+       __xprt_put_cong(xprt, req);
 }
 
 /**
@@ -415,10 +534,6 @@ void xprt_connect(struct rpc_task *task)
        dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 
-       if (xprt->shutdown) {
-               task->tk_status = -EIO;
-               return;
-       }
        if (!xprt->addr.sin_port) {
                task->tk_status = -EIO;
                return;
@@ -431,7 +546,7 @@ void xprt_connect(struct rpc_task *task)
                if (task->tk_rqstp)
                        task->tk_rqstp->rq_bytes_sent = 0;
 
-               task->tk_timeout = RPC_CONNECT_TIMEOUT;
+               task->tk_timeout = xprt->connect_timeout;
                rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
                xprt->ops->connect(task);
        }
@@ -498,80 +613,57 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
        return req;
 }
 
+/**
+ * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
+ * @task: RPC request that recently completed
+ *
+ */
+void xprt_update_rtt(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_rtt *rtt = task->tk_client->cl_rtt;
+       unsigned timer = task->tk_msg.rpc_proc->p_timer;
+
+       if (timer) {
+               if (req->rq_ntrans == 1)
+                       rpc_update_rtt(rtt, timer,
+                                       (long)jiffies - req->rq_xtime);
+               rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
+       }
+}
+
 /**
  * xprt_complete_rqst - called when reply processing is complete
- * @xprt: controlling transport
- * @req: RPC request that just completed
+ * @task: RPC request that recently completed
  * @copied: actual number of bytes received from the transport
  *
+ * Caller holds transport lock.
  */
-void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
-{
-       struct rpc_task *task = req->rq_task;
-       struct rpc_clnt *clnt = task->tk_client;
-
-       /* Adjust congestion window */
-       if (!xprt->nocong) {
-               unsigned timer = task->tk_msg.rpc_proc->p_timer;
-               xprt_adjust_cwnd(xprt, copied);
-               __xprt_put_cong(xprt, req);
-               if (timer) {
-                       if (req->rq_ntrans == 1)
-                               rpc_update_rtt(clnt->cl_rtt, timer,
-                                               (long)jiffies - req->rq_xtime);
-                       rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
-               }
-       }
+void xprt_complete_rqst(struct rpc_task *task, int copied)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
 
-#ifdef RPC_PROFILE
-       /* Profile only reads for now */
-       if (copied > 1024) {
-               static unsigned long    nextstat;
-               static unsigned long    pkt_rtt, pkt_len, pkt_cnt;
-
-               pkt_cnt++;
-               pkt_len += req->rq_slen + copied;
-               pkt_rtt += jiffies - req->rq_xtime;
-               if (time_before(nextstat, jiffies)) {
-                       printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
-                       printk("RPC: %ld %ld %ld %ld stat\n",
-                                       jiffies, pkt_cnt, pkt_len, pkt_rtt);
-                       pkt_rtt = pkt_len = pkt_cnt = 0;
-                       nextstat = jiffies + 5 * HZ;
-               }
-       }
-#endif
+       dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
+                       task->tk_pid, ntohl(req->rq_xid), copied);
 
-       dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
        list_del_init(&req->rq_list);
        req->rq_received = req->rq_private_buf.len = copied;
-
-       /* ... and wake up the process. */
        rpc_wake_up_task(task);
-       return;
 }
 
-/*
- * RPC receive timeout handler.
- */
-static void
-xprt_timer(struct rpc_task *task)
+static void xprt_timer(struct rpc_task *task)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
 
-       spin_lock(&xprt->transport_lock);
-       if (req->rq_received)
-               goto out;
-
-       xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
-       __xprt_put_cong(xprt, req);
-
-       dprintk("RPC: %4d xprt_timer (%s request)\n",
-               task->tk_pid, req ? "pending" : "backlogged");
+       dprintk("RPC: %4d xprt_timer\n", task->tk_pid);
 
-       task->tk_status  = -ETIMEDOUT;
-out:
+       spin_lock(&xprt->transport_lock);
+       if (!req->rq_received) {
+               if (xprt->ops->timer)
+                       xprt->ops->timer(task);
+               task->tk_status = -ETIMEDOUT;
+       }
        task->tk_timeout = 0;
        rpc_wake_up_task(task);
        spin_unlock(&xprt->transport_lock);
@@ -590,15 +682,12 @@ int xprt_prepare_transmit(struct rpc_task *task)
 
        dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
 
-       if (xprt->shutdown)
-               return -EIO;
-
        spin_lock_bh(&xprt->transport_lock);
        if (req->rq_received && !req->rq_bytes_sent) {
                err = req->rq_received;
                goto out_unlock;
        }
-       if (!__xprt_lock_write(xprt, task)) {
+       if (!xprt->ops->reserve_xprt(task)) {
                err = -EAGAIN;
                goto out_unlock;
        }
@@ -612,6 +701,14 @@ out_unlock:
        return err;
 }
 
+void
+xprt_abort_transmit(struct rpc_task *task)
+{
+       struct rpc_xprt *xprt = task->tk_xprt;
+
+       xprt_release_write(xprt, task);
+}
+
 /**
  * xprt_transmit - send an RPC request on a transport
  * @task: controlling RPC task
@@ -653,7 +750,7 @@ void xprt_transmit(struct rpc_task *task)
                        task->tk_status = -ENOTCONN;
                else if (!req->rq_received)
                        rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
-               __xprt_release_write(xprt, task);
+               xprt->ops->release_xprt(xprt, task);
                spin_unlock_bh(&xprt->transport_lock);
                return;
        }
@@ -666,7 +763,6 @@ void xprt_transmit(struct rpc_task *task)
 
        switch (status) {
        case -ECONNREFUSED:
-               task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
                rpc_sleep_on(&xprt->sending, task, NULL, NULL);
        case -EAGAIN:
        case -ENOTCONN:
@@ -710,11 +806,9 @@ void xprt_reserve(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = -EIO;
-       if (!xprt->shutdown) {
-               spin_lock(&xprt->reserve_lock);
-               do_xprt_reserve(task);
-               spin_unlock(&xprt->reserve_lock);
-       }
+       spin_lock(&xprt->reserve_lock);
+       do_xprt_reserve(task);
+       spin_unlock(&xprt->reserve_lock);
 }
 
 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -734,7 +828,10 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
        req->rq_timeout = xprt->timeout.to_initval;
        req->rq_task    = task;
        req->rq_xprt    = xprt;
+       req->rq_buffer  = NULL;
+       req->rq_bufsize = 0;
        req->rq_xid     = xprt_alloc_xid(xprt);
+       req->rq_release_snd_buf = NULL;
        dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
                        req, ntohl(req->rq_xid));
 }
@@ -752,23 +849,27 @@ void xprt_release(struct rpc_task *task)
        if (!(req = task->tk_rqstp))
                return;
        spin_lock_bh(&xprt->transport_lock);
-       __xprt_release_write(xprt, task);
-       __xprt_put_cong(xprt, req);
+       xprt->ops->release_xprt(xprt, task);
+       if (xprt->ops->release_request)
+               xprt->ops->release_request(task);
        if (!list_empty(&req->rq_list))
                list_del(&req->rq_list);
        xprt->last_used = jiffies;
-       if (list_empty(&xprt->recv) && !xprt->shutdown)
+       if (list_empty(&xprt->recv))
                mod_timer(&xprt->timer,
-                               xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT);
+                               xprt->last_used + xprt->idle_timeout);
        spin_unlock_bh(&xprt->transport_lock);
+       xprt->ops->buf_free(task);
        task->tk_rqstp = NULL;
+       if (req->rq_release_snd_buf)
+               req->rq_release_snd_buf(req);
        memset(req, 0, sizeof(*req));   /* mark unused */
 
        dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
 
        spin_lock(&xprt->reserve_lock);
        list_add(&req->rq_list, &xprt->free);
-       xprt_clear_backlog(xprt);
+       rpc_wake_up_next(&xprt->backlog);
        spin_unlock(&xprt->reserve_lock);
 }
 
@@ -820,7 +921,6 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
 
        spin_lock_init(&xprt->transport_lock);
        spin_lock_init(&xprt->reserve_lock);
-       init_waitqueue_head(&xprt->cong_wait);
 
        INIT_LIST_HEAD(&xprt->free);
        INIT_LIST_HEAD(&xprt->recv);
@@ -829,6 +929,7 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
        xprt->timer.function = xprt_init_autodisconnect;
        xprt->timer.data = (unsigned long) xprt;
        xprt->last_used = jiffies;
+       xprt->cwnd = RPC_INITCWND;
 
        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
@@ -866,23 +967,6 @@ struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rp
        return xprt;
 }
 
-static void xprt_shutdown(struct rpc_xprt *xprt)
-{
-       xprt->shutdown = 1;
-       rpc_wake_up(&xprt->sending);
-       rpc_wake_up(&xprt->resend);
-       xprt_wake_pending_tasks(xprt, -EIO);
-       rpc_wake_up(&xprt->backlog);
-       wake_up(&xprt->cong_wait);
-       del_timer_sync(&xprt->timer);
-}
-
-static int xprt_clear_backlog(struct rpc_xprt *xprt) {
-       rpc_wake_up_next(&xprt->backlog);
-       wake_up(&xprt->cong_wait);
-       return 1;
-}
-
 /**
  * xprt_destroy - destroy an RPC transport, killing off all requests.
  * @xprt: transport to destroy
@@ -891,7 +975,8 @@ static int xprt_clear_backlog(struct rpc_xprt *xprt) {
 int xprt_destroy(struct rpc_xprt *xprt)
 {
        dprintk("RPC:      destroying transport %p\n", xprt);
-       xprt_shutdown(xprt);
+       xprt->shutdown = 1;
+       del_timer_sync(&xprt->timer);
        xprt->ops->destroy(xprt);
        kfree(xprt);