]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - net/sunrpc/xprt.c
Merge master.kernel.org:/pub/scm/linux/kernel/git/davej/agpgart
[linux-2.6-omap-h63xx.git] / net / sunrpc / xprt.c
index 707806fe1a238c957bf7fe67f363ea7b0eb0accd..80857470dc112f15fe18bbd91510e6147243e5e3 100644 (file)
 #include <linux/types.h>
 #include <linux/interrupt.h>
 #include <linux/workqueue.h>
-#include <linux/random.h>
+#include <linux/net.h>
 
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/metrics.h>
 
 /*
  * Local variables
  */
 
 #ifdef RPC_DEBUG
-# undef  RPC_DEBUG_DATA
 # define RPCDBG_FACILITY       RPCDBG_XPRT
 #endif
 
@@ -62,7 +62,23 @@ static inline void   do_xprt_reserve(struct rpc_task *);
 static void    xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
-static int     xprt_clear_backlog(struct rpc_xprt *xprt);
+/*
+ * The transport code maintains an estimate on the maximum number of out-
+ * standing RPC requests, using a smoothed version of the congestion
+ * avoidance implemented in 44BSD. This is basically the Van Jacobson
+ * congestion algorithm: If a retransmit occurs, the congestion window is
+ * halved; otherwise, it is incremented by 1/cwnd when
+ *
+ *     -       a reply is received and
+ *     -       a full number of requests are outstanding and
+ *     -       the congestion window hasn't been updated recently.
+ */
+#define RPC_CWNDSHIFT          (8U)
+#define RPC_CWNDSCALE          (1U << RPC_CWNDSHIFT)
+#define RPC_INITCWND           RPC_CWNDSCALE
+#define RPC_MAXCWND(xprt)      ((xprt)->max_reqs << RPC_CWNDSHIFT)
+
+#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
 
 /**
  * xprt_reserve_xprt - serialize write access to transports
@@ -103,6 +119,17 @@ out_sleep:
        return 0;
 }
 
+static void xprt_clear_locked(struct rpc_xprt *xprt)
+{
+       xprt->snd_task = NULL;
+       if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
+               smp_mb__before_clear_bit();
+               clear_bit(XPRT_LOCKED, &xprt->state);
+               smp_mb__after_clear_bit();
+       } else
+               schedule_work(&xprt->task_cleanup);
+}
+
 /*
  * xprt_reserve_xprt_cong - serialize write access to transports
  * @task: task that is requesting access to the transport
@@ -129,9 +156,7 @@ int xprt_reserve_xprt_cong(struct rpc_task *task)
                }
                return 1;
        }
-       smp_mb__before_clear_bit();
-       clear_bit(XPRT_LOCKED, &xprt->state);
-       smp_mb__after_clear_bit();
+       xprt_clear_locked(xprt);
 out_sleep:
        dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt);
        task->tk_timeout = 0;
@@ -177,9 +202,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
        return;
 
 out_unlock:
-       smp_mb__before_clear_bit();
-       clear_bit(XPRT_LOCKED, &xprt->state);
-       smp_mb__after_clear_bit();
+       xprt_clear_locked(xprt);
 }
 
 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
@@ -206,9 +229,7 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
                return;
        }
 out_unlock:
-       smp_mb__before_clear_bit();
-       clear_bit(XPRT_LOCKED, &xprt->state);
-       smp_mb__after_clear_bit();
+       xprt_clear_locked(xprt);
 }
 
 /**
@@ -221,10 +242,7 @@ out_unlock:
 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
-               xprt->snd_task = NULL;
-               smp_mb__before_clear_bit();
-               clear_bit(XPRT_LOCKED, &xprt->state);
-               smp_mb__after_clear_bit();
+               xprt_clear_locked(xprt);
                __xprt_lock_write_next(xprt);
        }
 }
@@ -240,10 +258,7 @@ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
-               xprt->snd_task = NULL;
-               smp_mb__before_clear_bit();
-               clear_bit(XPRT_LOCKED, &xprt->state);
-               smp_mb__after_clear_bit();
+               xprt_clear_locked(xprt);
                __xprt_lock_write_next_cong(xprt);
        }
 }
@@ -289,6 +304,17 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
        __xprt_lock_write_next_cong(xprt);
 }
 
+/**
+ * xprt_release_rqst_cong - housekeeping when request is complete
+ * @task: RPC request that recently completed
+ *
+ * Useful for transports that require congestion control.
+ */
+void xprt_release_rqst_cong(struct rpc_task *task)
+{
+       __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
+}
+
 /**
  * xprt_adjust_cwnd - adjust transport congestion window
  * @task: recently completed RPC request used to adjust window
@@ -508,11 +534,7 @@ void xprt_connect(struct rpc_task *task)
        dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 
-       if (xprt->shutdown) {
-               task->tk_status = -EIO;
-               return;
-       }
-       if (!xprt->addr.sin_port) {
+       if (!xprt_bound(xprt)) {
                task->tk_status = -EIO;
                return;
        }
@@ -524,8 +546,9 @@ void xprt_connect(struct rpc_task *task)
                if (task->tk_rqstp)
                        task->tk_rqstp->rq_bytes_sent = 0;
 
-               task->tk_timeout = RPC_CONNECT_TIMEOUT;
+               task->tk_timeout = xprt->connect_timeout;
                rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
+               xprt->stat.connect_start = jiffies;
                xprt->ops->connect(task);
        }
        return;
@@ -536,6 +559,8 @@ static void xprt_connect_status(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
 
        if (task->tk_status >= 0) {
+               xprt->stat.connect_count++;
+               xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
                dprintk("RPC: %4d xprt_connect_status: connection established\n",
                                task->tk_pid);
                return;
@@ -560,13 +585,6 @@ static void xprt_connect_status(struct rpc_task *task)
                                task->tk_pid, -task->tk_status, task->tk_client->cl_server);
                xprt_release_write(xprt, task);
                task->tk_status = -EIO;
-               return;
-       }
-
-       /* if soft mounted, just cause this RPC to fail */
-       if (RPC_IS_SOFT(task)) {
-               xprt_release_write(xprt, task);
-               task->tk_status = -EIO;
        }
 }
 
@@ -576,19 +594,17 @@ static void xprt_connect_status(struct rpc_task *task)
  * @xid: RPC XID of incoming reply
  *
  */
-struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
+struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 {
        struct list_head *pos;
-       struct rpc_rqst *req = NULL;
 
        list_for_each(pos, &xprt->recv) {
                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
-               if (entry->rq_xid == xid) {
-                       req = entry;
-                       break;
-               }
+               if (entry->rq_xid == xid)
+                       return entry;
        }
-       return req;
+       xprt->stat.bad_xids++;
+       return NULL;
 }
 
 /**
@@ -624,7 +640,12 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
                        task->tk_pid, ntohl(req->rq_xid), copied);
 
+       task->tk_xprt->stat.recvs++;
+       task->tk_rtt = (long)jiffies - req->rq_xtime;
+
        list_del_init(&req->rq_list);
+       /* Ensure all writes are done before we update req->rq_received */
+       smp_wmb();
        req->rq_received = req->rq_private_buf.len = copied;
        rpc_wake_up_task(task);
 }
@@ -660,9 +681,6 @@ int xprt_prepare_transmit(struct rpc_task *task)
 
        dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
 
-       if (xprt->shutdown)
-               return -EIO;
-
        spin_lock_bh(&xprt->transport_lock);
        if (req->rq_received && !req->rq_bytes_sent) {
                err = req->rq_received;
@@ -682,6 +700,11 @@ out_unlock:
        return err;
 }
 
+void xprt_end_transmit(struct rpc_task *task)
+{
+       xprt_release_write(task->tk_xprt, task);
+}
+
 /**
  * xprt_transmit - send an RPC request on a transport
  * @task: controlling RPC task
@@ -696,7 +719,6 @@ void xprt_transmit(struct rpc_task *task)
 
        dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
-       smp_rmb();
        if (!req->rq_received) {
                if (list_empty(&req->rq_list)) {
                        spin_lock_bh(&xprt->transport_lock);
@@ -717,13 +739,18 @@ void xprt_transmit(struct rpc_task *task)
        if (status == 0) {
                dprintk("RPC: %4d xmit complete\n", task->tk_pid);
                spin_lock_bh(&xprt->transport_lock);
+
                xprt->ops->set_retrans_timeout(task);
+
+               xprt->stat.sends++;
+               xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
+               xprt->stat.bklog_u += xprt->backlog.qlen;
+
                /* Don't race with disconnect */
                if (!xprt_connected(xprt))
                        task->tk_status = -ENOTCONN;
                else if (!req->rq_received)
                        rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
-               xprt->ops->release_xprt(xprt, task);
                spin_unlock_bh(&xprt->transport_lock);
                return;
        }
@@ -733,19 +760,8 @@ void xprt_transmit(struct rpc_task *task)
         *       schedq, and being picked up by a parallel run of rpciod().
         */
        task->tk_status = status;
-
-       switch (status) {
-       case -ECONNREFUSED:
-               task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
+       if (status == -ECONNREFUSED)
                rpc_sleep_on(&xprt->sending, task, NULL, NULL);
-       case -EAGAIN:
-       case -ENOTCONN:
-               return;
-       default:
-               break;
-       }
-       xprt_release_write(xprt, task);
-       return;
 }
 
 static inline void do_xprt_reserve(struct rpc_task *task)
@@ -780,21 +796,19 @@ void xprt_reserve(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = -EIO;
-       if (!xprt->shutdown) {
-               spin_lock(&xprt->reserve_lock);
-               do_xprt_reserve(task);
-               spin_unlock(&xprt->reserve_lock);
-       }
+       spin_lock(&xprt->reserve_lock);
+       do_xprt_reserve(task);
+       spin_unlock(&xprt->reserve_lock);
 }
 
-static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
+static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 {
        return xprt->xid++;
 }
 
 static inline void xprt_init_xid(struct rpc_xprt *xprt)
 {
-       get_random_bytes(&xprt->xid, sizeof(xprt->xid));
+       xprt->xid = net_random();
 }
 
 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
@@ -804,7 +818,11 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
        req->rq_timeout = xprt->timeout.to_initval;
        req->rq_task    = task;
        req->rq_xprt    = xprt;
+       req->rq_buffer  = NULL;
+       req->rq_bufsize = 0;
        req->rq_xid     = xprt_alloc_xid(xprt);
+       req->rq_release_snd_buf = NULL;
+       xprt_reset_majortimeo(req);
        dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
                        req, ntohl(req->rq_xid));
 }
@@ -821,24 +839,29 @@ void xprt_release(struct rpc_task *task)
 
        if (!(req = task->tk_rqstp))
                return;
+       rpc_count_iostats(task);
        spin_lock_bh(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
-       __xprt_put_cong(xprt, req);
+       if (xprt->ops->release_request)
+               xprt->ops->release_request(task);
        if (!list_empty(&req->rq_list))
                list_del(&req->rq_list);
        xprt->last_used = jiffies;
-       if (list_empty(&xprt->recv) && !xprt->shutdown)
+       if (list_empty(&xprt->recv))
                mod_timer(&xprt->timer,
-                               xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT);
+                               xprt->last_used + xprt->idle_timeout);
        spin_unlock_bh(&xprt->transport_lock);
+       xprt->ops->buf_free(task);
        task->tk_rqstp = NULL;
+       if (req->rq_release_snd_buf)
+               req->rq_release_snd_buf(req);
        memset(req, 0, sizeof(*req));   /* mark unused */
 
        dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
 
        spin_lock(&xprt->reserve_lock);
        list_add(&req->rq_list, &xprt->free);
-       xprt_clear_backlog(xprt);
+       rpc_wake_up_next(&xprt->backlog);
        spin_unlock(&xprt->reserve_lock);
 }
 
@@ -858,17 +881,32 @@ void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long i
        to->to_exponential = 0;
 }
 
-static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
+/**
+ * xprt_create_transport - create an RPC transport
+ * @proto: requested transport protocol
+ * @ap: remote peer address
+ * @size: length of address
+ * @to: timeout parameters
+ *
+ */
+struct rpc_xprt *xprt_create_transport(int proto, struct sockaddr *ap, size_t size, struct rpc_timeout *to)
 {
        int result;
        struct rpc_xprt *xprt;
        struct rpc_rqst *req;
 
-       if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
+       if ((xprt = kzalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) {
+               dprintk("RPC:      xprt_create_transport: no memory\n");
                return ERR_PTR(-ENOMEM);
-       memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
-
-       xprt->addr = *ap;
+       }
+       if (size <= sizeof(xprt->addr)) {
+               memcpy(&xprt->addr, ap, size);
+               xprt->addrlen = size;
+       } else {
+               kfree(xprt);
+               dprintk("RPC:      xprt_create_transport: address too large\n");
+               return ERR_PTR(-EBADF);
+       }
 
        switch (proto) {
        case IPPROTO_UDP:
@@ -880,17 +918,17 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
        default:
                printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
                                proto);
-               result = -EIO;
-               break;
+               return ERR_PTR(-EIO);
        }
        if (result) {
                kfree(xprt);
+               dprintk("RPC:      xprt_create_transport: failed, %d\n", result);
                return ERR_PTR(result);
        }
 
+       kref_init(&xprt->kref);
        spin_lock_init(&xprt->transport_lock);
        spin_lock_init(&xprt->reserve_lock);
-       init_waitqueue_head(&xprt->cong_wait);
 
        INIT_LIST_HEAD(&xprt->free);
        INIT_LIST_HEAD(&xprt->recv);
@@ -899,7 +937,9 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
        xprt->timer.function = xprt_init_autodisconnect;
        xprt->timer.data = (unsigned long) xprt;
        xprt->last_used = jiffies;
+       xprt->cwnd = RPC_INITCWND;
 
+       rpc_init_wait_queue(&xprt->binding, "xprt_binding");
        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
        rpc_init_wait_queue(&xprt->resend, "xprt_resend");
@@ -913,57 +953,43 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
 
        dprintk("RPC:      created transport %p with %u slots\n", xprt,
                        xprt->max_reqs);
-       
+
        return xprt;
 }
 
 /**
- * xprt_create_proto - create an RPC client transport
- * @proto: requested transport protocol
- * @sap: remote peer's address
- * @to: timeout parameters for new transport
+ * xprt_destroy - destroy an RPC transport, killing off all requests.
+ * @kref: kref for the transport to destroy
  *
  */
-struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
+static void xprt_destroy(struct kref *kref)
 {
-       struct rpc_xprt *xprt;
-
-       xprt = xprt_setup(proto, sap, to);
-       if (IS_ERR(xprt))
-               dprintk("RPC:      xprt_create_proto failed\n");
-       else
-               dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
-       return xprt;
-}
+       struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
 
-static void xprt_shutdown(struct rpc_xprt *xprt)
-{
+       dprintk("RPC:      destroying transport %p\n", xprt);
        xprt->shutdown = 1;
-       rpc_wake_up(&xprt->sending);
-       rpc_wake_up(&xprt->resend);
-       xprt_wake_pending_tasks(xprt, -EIO);
-       rpc_wake_up(&xprt->backlog);
-       wake_up(&xprt->cong_wait);
        del_timer_sync(&xprt->timer);
+       xprt->ops->destroy(xprt);
+       kfree(xprt);
 }
 
-static int xprt_clear_backlog(struct rpc_xprt *xprt) {
-       rpc_wake_up_next(&xprt->backlog);
-       wake_up(&xprt->cong_wait);
-       return 1;
+/**
+ * xprt_put - release a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+void xprt_put(struct rpc_xprt *xprt)
+{
+       kref_put(&xprt->kref, xprt_destroy);
 }
 
 /**
- * xprt_destroy - destroy an RPC transport, killing off all requests.
- * @xprt: transport to destroy
+ * xprt_get - return a reference to an RPC transport.
+ * @xprt: pointer to the transport
  *
  */
-int xprt_destroy(struct rpc_xprt *xprt)
+struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
 {
-       dprintk("RPC:      destroying transport %p\n", xprt);
-       xprt_shutdown(xprt);
-       xprt->ops->destroy(xprt);
-       kfree(xprt);
-
-       return 0;
+       kref_get(&xprt->kref);
+       return xprt;
 }