]> pilppa.org Git - linux-2.6-omap-h63xx.git/blob - net/sunrpc/clnt.c
SUNRPC: Move rpc_register_client and friends into net/sunrpc/clnt.c
[linux-2.6-omap-h63xx.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/smp_lock.h>
31 #include <linux/utsname.h>
32 #include <linux/workqueue.h>
33
34 #include <linux/sunrpc/clnt.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36 #include <linux/sunrpc/metrics.h>
37
38
39 #ifdef RPC_DEBUG
40 # define RPCDBG_FACILITY        RPCDBG_CALL
41 #endif
42
43 #define dprint_status(t)                                        \
44         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
45                         __FUNCTION__, t->tk_status)
46
47 /*
48  * All RPC clients are linked into this list
49  */
50 static LIST_HEAD(all_clients);
51 static DEFINE_SPINLOCK(rpc_client_lock);
52
53 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
54
55
56 static void     call_start(struct rpc_task *task);
57 static void     call_reserve(struct rpc_task *task);
58 static void     call_reserveresult(struct rpc_task *task);
59 static void     call_allocate(struct rpc_task *task);
60 static void     call_encode(struct rpc_task *task);
61 static void     call_decode(struct rpc_task *task);
62 static void     call_bind(struct rpc_task *task);
63 static void     call_bind_status(struct rpc_task *task);
64 static void     call_transmit(struct rpc_task *task);
65 static void     call_status(struct rpc_task *task);
66 static void     call_transmit_status(struct rpc_task *task);
67 static void     call_refresh(struct rpc_task *task);
68 static void     call_refreshresult(struct rpc_task *task);
69 static void     call_timeout(struct rpc_task *task);
70 static void     call_connect(struct rpc_task *task);
71 static void     call_connect_status(struct rpc_task *task);
72 static __be32 * call_header(struct rpc_task *task);
73 static __be32 * call_verify(struct rpc_task *task);
74
75 static void rpc_register_client(struct rpc_clnt *clnt)
76 {
77         spin_lock(&rpc_client_lock);
78         list_add(&clnt->cl_clients, &all_clients);
79         spin_unlock(&rpc_client_lock);
80 }
81
82 static void rpc_unregister_client(struct rpc_clnt *clnt)
83 {
84         spin_lock(&rpc_client_lock);
85         list_del(&clnt->cl_clients);
86         spin_unlock(&rpc_client_lock);
87 }
88
89 static int
90 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
91 {
92         static uint32_t clntid;
93         int error;
94
95         clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
96         clnt->cl_dentry = ERR_PTR(-ENOENT);
97         if (dir_name == NULL)
98                 return 0;
99
100         clnt->cl_vfsmnt = rpc_get_mount();
101         if (IS_ERR(clnt->cl_vfsmnt))
102                 return PTR_ERR(clnt->cl_vfsmnt);
103
104         for (;;) {
105                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
106                                 "%s/clnt%x", dir_name,
107                                 (unsigned int)clntid++);
108                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
109                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
110                 if (!IS_ERR(clnt->cl_dentry))
111                         return 0;
112                 error = PTR_ERR(clnt->cl_dentry);
113                 if (error != -EEXIST) {
114                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
115                                         clnt->cl_pathname, error);
116                         rpc_put_mount();
117                         return error;
118                 }
119         }
120 }
121
122 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
123 {
124         struct rpc_version      *version;
125         struct rpc_clnt         *clnt = NULL;
126         struct rpc_auth         *auth;
127         int err;
128         int len;
129
130         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
131                         program->name, servname, xprt);
132
133         err = rpciod_up();
134         if (err)
135                 goto out_no_rpciod;
136         err = -EINVAL;
137         if (!xprt)
138                 goto out_no_xprt;
139         if (vers >= program->nrvers || !(version = program->version[vers]))
140                 goto out_err;
141
142         err = -ENOMEM;
143         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
144         if (!clnt)
145                 goto out_err;
146         clnt->cl_parent = clnt;
147
148         clnt->cl_server = clnt->cl_inline_name;
149         len = strlen(servname) + 1;
150         if (len > sizeof(clnt->cl_inline_name)) {
151                 char *buf = kmalloc(len, GFP_KERNEL);
152                 if (buf != 0)
153                         clnt->cl_server = buf;
154                 else
155                         len = sizeof(clnt->cl_inline_name);
156         }
157         strlcpy(clnt->cl_server, servname, len);
158
159         clnt->cl_xprt     = xprt;
160         clnt->cl_procinfo = version->procs;
161         clnt->cl_maxproc  = version->nrprocs;
162         clnt->cl_protname = program->name;
163         clnt->cl_prog     = program->number;
164         clnt->cl_vers     = version->number;
165         clnt->cl_stats    = program->stats;
166         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
167         err = -ENOMEM;
168         if (clnt->cl_metrics == NULL)
169                 goto out_no_stats;
170         clnt->cl_program  = program;
171         INIT_LIST_HEAD(&clnt->cl_tasks);
172         spin_lock_init(&clnt->cl_lock);
173
174         if (!xprt_bound(clnt->cl_xprt))
175                 clnt->cl_autobind = 1;
176
177         clnt->cl_rtt = &clnt->cl_rtt_default;
178         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
179
180         kref_init(&clnt->cl_kref);
181
182         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
183         if (err < 0)
184                 goto out_no_path;
185
186         auth = rpcauth_create(flavor, clnt);
187         if (IS_ERR(auth)) {
188                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
189                                 flavor);
190                 err = PTR_ERR(auth);
191                 goto out_no_auth;
192         }
193
194         /* save the nodename */
195         clnt->cl_nodelen = strlen(utsname()->nodename);
196         if (clnt->cl_nodelen > UNX_MAXNODENAME)
197                 clnt->cl_nodelen = UNX_MAXNODENAME;
198         memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
199         rpc_register_client(clnt);
200         return clnt;
201
202 out_no_auth:
203         if (!IS_ERR(clnt->cl_dentry)) {
204                 rpc_rmdir(clnt->cl_dentry);
205                 rpc_put_mount();
206         }
207 out_no_path:
208         rpc_free_iostats(clnt->cl_metrics);
209 out_no_stats:
210         if (clnt->cl_server != clnt->cl_inline_name)
211                 kfree(clnt->cl_server);
212         kfree(clnt);
213 out_err:
214         xprt_put(xprt);
215 out_no_xprt:
216         rpciod_down();
217 out_no_rpciod:
218         return ERR_PTR(err);
219 }
220
221 /*
222  * rpc_create - create an RPC client and transport with one call
223  * @args: rpc_clnt create argument structure
224  *
225  * Creates and initializes an RPC transport and an RPC client.
226  *
227  * It can ping the server in order to determine if it is up, and to see if
228  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
229  * this behavior so asynchronous tasks can also use rpc_create.
230  */
231 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
232 {
233         struct rpc_xprt *xprt;
234         struct rpc_clnt *clnt;
235
236         xprt = xprt_create_transport(args->protocol, args->address,
237                                         args->addrsize, args->timeout);
238         if (IS_ERR(xprt))
239                 return (struct rpc_clnt *)xprt;
240
241         /*
242          * By default, kernel RPC client connects from a reserved port.
243          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
244          * but it is always enabled for rpciod, which handles the connect
245          * operation.
246          */
247         xprt->resvport = 1;
248         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
249                 xprt->resvport = 0;
250
251         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
252                         args->program->name, args->servername, xprt);
253
254         clnt = rpc_new_client(xprt, args->servername, args->program,
255                                 args->version, args->authflavor);
256         if (IS_ERR(clnt))
257                 return clnt;
258
259         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
260                 int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
261                 if (err != 0) {
262                         rpc_shutdown_client(clnt);
263                         return ERR_PTR(err);
264                 }
265         }
266
267         clnt->cl_softrtry = 1;
268         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
269                 clnt->cl_softrtry = 0;
270
271         if (args->flags & RPC_CLNT_CREATE_INTR)
272                 clnt->cl_intr = 1;
273         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
274                 clnt->cl_autobind = 1;
275         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
276                 clnt->cl_discrtry = 1;
277
278         return clnt;
279 }
280 EXPORT_SYMBOL_GPL(rpc_create);
281
282 /*
283  * This function clones the RPC client structure. It allows us to share the
284  * same transport while varying parameters such as the authentication
285  * flavour.
286  */
287 struct rpc_clnt *
288 rpc_clone_client(struct rpc_clnt *clnt)
289 {
290         struct rpc_clnt *new;
291         int err = -ENOMEM;
292
293         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
294         if (!new)
295                 goto out_no_clnt;
296         new->cl_parent = clnt;
297         /* Turn off autobind on clones */
298         new->cl_autobind = 0;
299         INIT_LIST_HEAD(&new->cl_tasks);
300         spin_lock_init(&new->cl_lock);
301         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
302         new->cl_metrics = rpc_alloc_iostats(clnt);
303         if (new->cl_metrics == NULL)
304                 goto out_no_stats;
305         kref_init(&new->cl_kref);
306         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
307         if (err != 0)
308                 goto out_no_path;
309         if (new->cl_auth)
310                 atomic_inc(&new->cl_auth->au_count);
311         xprt_get(clnt->cl_xprt);
312         kref_get(&clnt->cl_kref);
313         rpc_register_client(new);
314         rpciod_up();
315         return new;
316 out_no_path:
317         rpc_free_iostats(new->cl_metrics);
318 out_no_stats:
319         kfree(new);
320 out_no_clnt:
321         dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
322         return ERR_PTR(err);
323 }
324
325 /*
326  * Properly shut down an RPC client, terminating all outstanding
327  * requests.
328  */
329 void rpc_shutdown_client(struct rpc_clnt *clnt)
330 {
331         dprintk("RPC:       shutting down %s client for %s\n",
332                         clnt->cl_protname, clnt->cl_server);
333
334         while (!list_empty(&clnt->cl_tasks)) {
335                 rpc_killall_tasks(clnt);
336                 wait_event_timeout(destroy_wait,
337                         list_empty(&clnt->cl_tasks), 1*HZ);
338         }
339
340         rpc_release_client(clnt);
341 }
342
343 /*
344  * Free an RPC client
345  */
346 static void
347 rpc_free_client(struct kref *kref)
348 {
349         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
350
351         dprintk("RPC:       destroying %s client for %s\n",
352                         clnt->cl_protname, clnt->cl_server);
353         if (clnt->cl_auth) {
354                 rpcauth_destroy(clnt->cl_auth);
355                 clnt->cl_auth = NULL;
356         }
357         if (!IS_ERR(clnt->cl_dentry)) {
358                 rpc_rmdir(clnt->cl_dentry);
359                 rpc_put_mount();
360         }
361         if (clnt->cl_parent != clnt) {
362                 rpc_release_client(clnt->cl_parent);
363                 goto out_free;
364         }
365         if (clnt->cl_server != clnt->cl_inline_name)
366                 kfree(clnt->cl_server);
367 out_free:
368         rpc_unregister_client(clnt);
369         rpc_free_iostats(clnt->cl_metrics);
370         clnt->cl_metrics = NULL;
371         xprt_put(clnt->cl_xprt);
372         rpciod_down();
373         kfree(clnt);
374 }
375
376 /*
377  * Release reference to the RPC client
378  */
379 void
380 rpc_release_client(struct rpc_clnt *clnt)
381 {
382         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
383
384         if (list_empty(&clnt->cl_tasks))
385                 wake_up(&destroy_wait);
386         kref_put(&clnt->cl_kref, rpc_free_client);
387 }
388
389 /**
390  * rpc_bind_new_program - bind a new RPC program to an existing client
391  * @old - old rpc_client
392  * @program - rpc program to set
393  * @vers - rpc program version
394  *
395  * Clones the rpc client and sets up a new RPC program. This is mainly
396  * of use for enabling different RPC programs to share the same transport.
397  * The Sun NFSv2/v3 ACL protocol can do this.
398  */
399 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
400                                       struct rpc_program *program,
401                                       int vers)
402 {
403         struct rpc_clnt *clnt;
404         struct rpc_version *version;
405         int err;
406
407         BUG_ON(vers >= program->nrvers || !program->version[vers]);
408         version = program->version[vers];
409         clnt = rpc_clone_client(old);
410         if (IS_ERR(clnt))
411                 goto out;
412         clnt->cl_procinfo = version->procs;
413         clnt->cl_maxproc  = version->nrprocs;
414         clnt->cl_protname = program->name;
415         clnt->cl_prog     = program->number;
416         clnt->cl_vers     = version->number;
417         clnt->cl_stats    = program->stats;
418         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
419         if (err != 0) {
420                 rpc_shutdown_client(clnt);
421                 clnt = ERR_PTR(err);
422         }
423 out:
424         return clnt;
425 }
426
427 /*
428  * Default callback for async RPC calls
429  */
430 static void
431 rpc_default_callback(struct rpc_task *task, void *data)
432 {
433 }
434
435 static const struct rpc_call_ops rpc_default_ops = {
436         .rpc_call_done = rpc_default_callback,
437 };
438
439 /*
440  *      Export the signal mask handling for synchronous code that
441  *      sleeps on RPC calls
442  */
443 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
444
445 static void rpc_save_sigmask(sigset_t *oldset, int intr)
446 {
447         unsigned long   sigallow = sigmask(SIGKILL);
448         sigset_t sigmask;
449
450         /* Block all signals except those listed in sigallow */
451         if (intr)
452                 sigallow |= RPC_INTR_SIGNALS;
453         siginitsetinv(&sigmask, sigallow);
454         sigprocmask(SIG_BLOCK, &sigmask, oldset);
455 }
456
457 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
458 {
459         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
460 }
461
462 static inline void rpc_restore_sigmask(sigset_t *oldset)
463 {
464         sigprocmask(SIG_SETMASK, oldset, NULL);
465 }
466
467 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
468 {
469         rpc_save_sigmask(oldset, clnt->cl_intr);
470 }
471
472 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
473 {
474         rpc_restore_sigmask(oldset);
475 }
476
477 /*
478  * New rpc_call implementation
479  */
480 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
481 {
482         struct rpc_task *task;
483         sigset_t        oldset;
484         int             status;
485
486         BUG_ON(flags & RPC_TASK_ASYNC);
487
488         task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
489         if (task == NULL)
490                 return -ENOMEM;
491
492         /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
493         rpc_task_sigmask(task, &oldset);
494
495         /* Set up the call info struct and execute the task */
496         rpc_call_setup(task, msg, 0);
497         if (task->tk_status == 0) {
498                 atomic_inc(&task->tk_count);
499                 rpc_execute(task);
500         }
501         status = task->tk_status;
502         rpc_put_task(task);
503         rpc_restore_sigmask(&oldset);
504         return status;
505 }
506
507 /*
508  * New rpc_call implementation
509  */
510 int
511 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
512                const struct rpc_call_ops *tk_ops, void *data)
513 {
514         struct rpc_task *task;
515         sigset_t        oldset;
516         int             status;
517
518         flags |= RPC_TASK_ASYNC;
519
520         /* Create/initialize a new RPC task */
521         status = -ENOMEM;
522         if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
523                 goto out_release;
524
525         /* Mask signals on GSS_AUTH upcalls */
526         rpc_task_sigmask(task, &oldset);
527
528         rpc_call_setup(task, msg, 0);
529
530         /* Set up the call info struct and execute the task */
531         status = task->tk_status;
532         if (status == 0)
533                 rpc_execute(task);
534         else
535                 rpc_put_task(task);
536
537         rpc_restore_sigmask(&oldset);
538         return status;
539 out_release:
540         rpc_release_calldata(tk_ops, data);
541         return status;
542 }
543
544
545 void
546 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
547 {
548         task->tk_msg   = *msg;
549         task->tk_flags |= flags;
550         /* Bind the user cred */
551         if (task->tk_msg.rpc_cred != NULL)
552                 rpcauth_holdcred(task);
553         else
554                 rpcauth_bindcred(task);
555
556         if (task->tk_status == 0)
557                 task->tk_action = call_start;
558         else
559                 task->tk_action = rpc_exit_task;
560 }
561
562 /**
563  * rpc_peeraddr - extract remote peer address from clnt's xprt
564  * @clnt: RPC client structure
565  * @buf: target buffer
566  * @size: length of target buffer
567  *
568  * Returns the number of bytes that are actually in the stored address.
569  */
570 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
571 {
572         size_t bytes;
573         struct rpc_xprt *xprt = clnt->cl_xprt;
574
575         bytes = sizeof(xprt->addr);
576         if (bytes > bufsize)
577                 bytes = bufsize;
578         memcpy(buf, &clnt->cl_xprt->addr, bytes);
579         return xprt->addrlen;
580 }
581 EXPORT_SYMBOL_GPL(rpc_peeraddr);
582
583 /**
584  * rpc_peeraddr2str - return remote peer address in printable format
585  * @clnt: RPC client structure
586  * @format: address format
587  *
588  */
589 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
590 {
591         struct rpc_xprt *xprt = clnt->cl_xprt;
592
593         if (xprt->address_strings[format] != NULL)
594                 return xprt->address_strings[format];
595         else
596                 return "unprintable";
597 }
598 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
599
600 void
601 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
602 {
603         struct rpc_xprt *xprt = clnt->cl_xprt;
604         if (xprt->ops->set_buffer_size)
605                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
606 }
607
608 /*
609  * Return size of largest payload RPC client can support, in bytes
610  *
611  * For stream transports, this is one RPC record fragment (see RFC
612  * 1831), as we don't support multi-record requests yet.  For datagram
613  * transports, this is the size of an IP packet minus the IP, UDP, and
614  * RPC header sizes.
615  */
616 size_t rpc_max_payload(struct rpc_clnt *clnt)
617 {
618         return clnt->cl_xprt->max_payload;
619 }
620 EXPORT_SYMBOL_GPL(rpc_max_payload);
621
622 /**
623  * rpc_force_rebind - force transport to check that remote port is unchanged
624  * @clnt: client to rebind
625  *
626  */
627 void rpc_force_rebind(struct rpc_clnt *clnt)
628 {
629         if (clnt->cl_autobind)
630                 xprt_clear_bound(clnt->cl_xprt);
631 }
632 EXPORT_SYMBOL_GPL(rpc_force_rebind);
633
634 /*
635  * Restart an (async) RPC call. Usually called from within the
636  * exit handler.
637  */
638 void
639 rpc_restart_call(struct rpc_task *task)
640 {
641         if (RPC_ASSASSINATED(task))
642                 return;
643
644         task->tk_action = call_start;
645 }
646
647 /*
648  * 0.  Initial state
649  *
650  *     Other FSM states can be visited zero or more times, but
651  *     this state is visited exactly once for each RPC.
652  */
653 static void
654 call_start(struct rpc_task *task)
655 {
656         struct rpc_clnt *clnt = task->tk_client;
657
658         dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
659                         clnt->cl_protname, clnt->cl_vers,
660                         task->tk_msg.rpc_proc->p_proc,
661                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
662
663         /* Increment call count */
664         task->tk_msg.rpc_proc->p_count++;
665         clnt->cl_stats->rpccnt++;
666         task->tk_action = call_reserve;
667 }
668
669 /*
670  * 1.   Reserve an RPC call slot
671  */
672 static void
673 call_reserve(struct rpc_task *task)
674 {
675         dprint_status(task);
676
677         if (!rpcauth_uptodatecred(task)) {
678                 task->tk_action = call_refresh;
679                 return;
680         }
681
682         task->tk_status  = 0;
683         task->tk_action  = call_reserveresult;
684         xprt_reserve(task);
685 }
686
687 /*
688  * 1b.  Grok the result of xprt_reserve()
689  */
690 static void
691 call_reserveresult(struct rpc_task *task)
692 {
693         int status = task->tk_status;
694
695         dprint_status(task);
696
697         /*
698          * After a call to xprt_reserve(), we must have either
699          * a request slot or else an error status.
700          */
701         task->tk_status = 0;
702         if (status >= 0) {
703                 if (task->tk_rqstp) {
704                         task->tk_action = call_allocate;
705                         return;
706                 }
707
708                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
709                                 __FUNCTION__, status);
710                 rpc_exit(task, -EIO);
711                 return;
712         }
713
714         /*
715          * Even though there was an error, we may have acquired
716          * a request slot somehow.  Make sure not to leak it.
717          */
718         if (task->tk_rqstp) {
719                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
720                                 __FUNCTION__, status);
721                 xprt_release(task);
722         }
723
724         switch (status) {
725         case -EAGAIN:   /* woken up; retry */
726                 task->tk_action = call_reserve;
727                 return;
728         case -EIO:      /* probably a shutdown */
729                 break;
730         default:
731                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
732                                 __FUNCTION__, status);
733                 break;
734         }
735         rpc_exit(task, status);
736 }
737
738 /*
739  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
740  *      (Note: buffer memory is freed in xprt_release).
741  */
742 static void
743 call_allocate(struct rpc_task *task)
744 {
745         unsigned int slack = task->tk_auth->au_cslack;
746         struct rpc_rqst *req = task->tk_rqstp;
747         struct rpc_xprt *xprt = task->tk_xprt;
748         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
749
750         dprint_status(task);
751
752         task->tk_status = 0;
753         task->tk_action = call_bind;
754
755         if (req->rq_buffer)
756                 return;
757
758         if (proc->p_proc != 0) {
759                 BUG_ON(proc->p_arglen == 0);
760                 if (proc->p_decode != NULL)
761                         BUG_ON(proc->p_replen == 0);
762         }
763
764         /*
765          * Calculate the size (in quads) of the RPC call
766          * and reply headers, and convert both values
767          * to byte sizes.
768          */
769         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
770         req->rq_callsize <<= 2;
771         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
772         req->rq_rcvsize <<= 2;
773
774         req->rq_buffer = xprt->ops->buf_alloc(task,
775                                         req->rq_callsize + req->rq_rcvsize);
776         if (req->rq_buffer != NULL)
777                 return;
778
779         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
780
781         if (RPC_IS_ASYNC(task) || !signalled()) {
782                 xprt_release(task);
783                 task->tk_action = call_reserve;
784                 rpc_delay(task, HZ>>4);
785                 return;
786         }
787
788         rpc_exit(task, -ERESTARTSYS);
789 }
790
791 static inline int
792 rpc_task_need_encode(struct rpc_task *task)
793 {
794         return task->tk_rqstp->rq_snd_buf.len == 0;
795 }
796
797 static inline void
798 rpc_task_force_reencode(struct rpc_task *task)
799 {
800         task->tk_rqstp->rq_snd_buf.len = 0;
801 }
802
803 static inline void
804 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
805 {
806         buf->head[0].iov_base = start;
807         buf->head[0].iov_len = len;
808         buf->tail[0].iov_len = 0;
809         buf->page_len = 0;
810         buf->len = 0;
811         buf->buflen = len;
812 }
813
814 /*
815  * 3.   Encode arguments of an RPC call
816  */
817 static void
818 call_encode(struct rpc_task *task)
819 {
820         struct rpc_rqst *req = task->tk_rqstp;
821         kxdrproc_t      encode;
822         __be32          *p;
823
824         dprint_status(task);
825
826         rpc_xdr_buf_init(&req->rq_snd_buf,
827                          req->rq_buffer,
828                          req->rq_callsize);
829         rpc_xdr_buf_init(&req->rq_rcv_buf,
830                          (char *)req->rq_buffer + req->rq_callsize,
831                          req->rq_rcvsize);
832
833         /* Encode header and provided arguments */
834         encode = task->tk_msg.rpc_proc->p_encode;
835         if (!(p = call_header(task))) {
836                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
837                 rpc_exit(task, -EIO);
838                 return;
839         }
840         if (encode == NULL)
841                 return;
842
843         lock_kernel();
844         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
845                         task->tk_msg.rpc_argp);
846         unlock_kernel();
847         if (task->tk_status == -ENOMEM) {
848                 /* XXX: Is this sane? */
849                 rpc_delay(task, 3*HZ);
850                 task->tk_status = -EAGAIN;
851         }
852 }
853
854 /*
855  * 4.   Get the server port number if not yet set
856  */
857 static void
858 call_bind(struct rpc_task *task)
859 {
860         struct rpc_xprt *xprt = task->tk_xprt;
861
862         dprint_status(task);
863
864         task->tk_action = call_connect;
865         if (!xprt_bound(xprt)) {
866                 task->tk_action = call_bind_status;
867                 task->tk_timeout = xprt->bind_timeout;
868                 xprt->ops->rpcbind(task);
869         }
870 }
871
872 /*
873  * 4a.  Sort out bind result
874  */
875 static void
876 call_bind_status(struct rpc_task *task)
877 {
878         int status = -EACCES;
879
880         if (task->tk_status >= 0) {
881                 dprint_status(task);
882                 task->tk_status = 0;
883                 task->tk_action = call_connect;
884                 return;
885         }
886
887         switch (task->tk_status) {
888         case -EACCES:
889                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
890                                 "unavailable\n", task->tk_pid);
891                 rpc_delay(task, 3*HZ);
892                 goto retry_timeout;
893         case -ETIMEDOUT:
894                 dprintk("RPC: %5u rpcbind request timed out\n",
895                                 task->tk_pid);
896                 goto retry_timeout;
897         case -EPFNOSUPPORT:
898                 dprintk("RPC: %5u remote rpcbind service unavailable\n",
899                                 task->tk_pid);
900                 break;
901         case -EPROTONOSUPPORT:
902                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
903                                 task->tk_pid);
904                 task->tk_status = 0;
905                 task->tk_action = call_bind;
906                 return;
907         default:
908                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
909                                 task->tk_pid, -task->tk_status);
910                 status = -EIO;
911         }
912
913         rpc_exit(task, status);
914         return;
915
916 retry_timeout:
917         task->tk_action = call_timeout;
918 }
919
920 /*
921  * 4b.  Connect to the RPC server
922  */
923 static void
924 call_connect(struct rpc_task *task)
925 {
926         struct rpc_xprt *xprt = task->tk_xprt;
927
928         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
929                         task->tk_pid, xprt,
930                         (xprt_connected(xprt) ? "is" : "is not"));
931
932         task->tk_action = call_transmit;
933         if (!xprt_connected(xprt)) {
934                 task->tk_action = call_connect_status;
935                 if (task->tk_status < 0)
936                         return;
937                 xprt_connect(task);
938         }
939 }
940
941 /*
942  * 4c.  Sort out connect result
943  */
944 static void
945 call_connect_status(struct rpc_task *task)
946 {
947         struct rpc_clnt *clnt = task->tk_client;
948         int status = task->tk_status;
949
950         dprint_status(task);
951
952         task->tk_status = 0;
953         if (status >= 0) {
954                 clnt->cl_stats->netreconn++;
955                 task->tk_action = call_transmit;
956                 return;
957         }
958
959         /* Something failed: remote service port may have changed */
960         rpc_force_rebind(clnt);
961
962         switch (status) {
963         case -ENOTCONN:
964         case -EAGAIN:
965                 task->tk_action = call_bind;
966                 if (!RPC_IS_SOFT(task))
967                         return;
968                 /* if soft mounted, test if we've timed out */
969         case -ETIMEDOUT:
970                 task->tk_action = call_timeout;
971                 return;
972         }
973         rpc_exit(task, -EIO);
974 }
975
976 /*
977  * 5.   Transmit the RPC request, and wait for reply
978  */
979 static void
980 call_transmit(struct rpc_task *task)
981 {
982         dprint_status(task);
983
984         task->tk_action = call_status;
985         if (task->tk_status < 0)
986                 return;
987         task->tk_status = xprt_prepare_transmit(task);
988         if (task->tk_status != 0)
989                 return;
990         task->tk_action = call_transmit_status;
991         /* Encode here so that rpcsec_gss can use correct sequence number. */
992         if (rpc_task_need_encode(task)) {
993                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
994                 call_encode(task);
995                 /* Did the encode result in an error condition? */
996                 if (task->tk_status != 0)
997                         return;
998         }
999         xprt_transmit(task);
1000         if (task->tk_status < 0)
1001                 return;
1002         /*
1003          * On success, ensure that we call xprt_end_transmit() before sleeping
1004          * in order to allow access to the socket to other RPC requests.
1005          */
1006         call_transmit_status(task);
1007         if (task->tk_msg.rpc_proc->p_decode != NULL)
1008                 return;
1009         task->tk_action = rpc_exit_task;
1010         rpc_wake_up_task(task);
1011 }
1012
1013 /*
1014  * 5a.  Handle cleanup after a transmission
1015  */
1016 static void
1017 call_transmit_status(struct rpc_task *task)
1018 {
1019         task->tk_action = call_status;
1020         /*
1021          * Special case: if we've been waiting on the socket's write_space()
1022          * callback, then don't call xprt_end_transmit().
1023          */
1024         if (task->tk_status == -EAGAIN)
1025                 return;
1026         xprt_end_transmit(task);
1027         rpc_task_force_reencode(task);
1028 }
1029
1030 /*
1031  * 6.   Sort out the RPC call status
1032  */
1033 static void
1034 call_status(struct rpc_task *task)
1035 {
1036         struct rpc_clnt *clnt = task->tk_client;
1037         struct rpc_rqst *req = task->tk_rqstp;
1038         int             status;
1039
1040         if (req->rq_received > 0 && !req->rq_bytes_sent)
1041                 task->tk_status = req->rq_received;
1042
1043         dprint_status(task);
1044
1045         status = task->tk_status;
1046         if (status >= 0) {
1047                 task->tk_action = call_decode;
1048                 return;
1049         }
1050
1051         task->tk_status = 0;
1052         switch(status) {
1053         case -EHOSTDOWN:
1054         case -EHOSTUNREACH:
1055         case -ENETUNREACH:
1056                 /*
1057                  * Delay any retries for 3 seconds, then handle as if it
1058                  * were a timeout.
1059                  */
1060                 rpc_delay(task, 3*HZ);
1061         case -ETIMEDOUT:
1062                 task->tk_action = call_timeout;
1063                 if (task->tk_client->cl_discrtry)
1064                         xprt_disconnect(task->tk_xprt);
1065                 break;
1066         case -ECONNREFUSED:
1067         case -ENOTCONN:
1068                 rpc_force_rebind(clnt);
1069                 task->tk_action = call_bind;
1070                 break;
1071         case -EAGAIN:
1072                 task->tk_action = call_transmit;
1073                 break;
1074         case -EIO:
1075                 /* shutdown or soft timeout */
1076                 rpc_exit(task, status);
1077                 break;
1078         default:
1079                 printk("%s: RPC call returned error %d\n",
1080                                clnt->cl_protname, -status);
1081                 rpc_exit(task, status);
1082         }
1083 }
1084
1085 /*
1086  * 6a.  Handle RPC timeout
1087  *      We do not release the request slot, so we keep using the
1088  *      same XID for all retransmits.
1089  */
1090 static void
1091 call_timeout(struct rpc_task *task)
1092 {
1093         struct rpc_clnt *clnt = task->tk_client;
1094
1095         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1096                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1097                 goto retry;
1098         }
1099
1100         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1101         task->tk_timeouts++;
1102
1103         if (RPC_IS_SOFT(task)) {
1104                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1105                                 clnt->cl_protname, clnt->cl_server);
1106                 rpc_exit(task, -EIO);
1107                 return;
1108         }
1109
1110         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1111                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1112                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1113                         clnt->cl_protname, clnt->cl_server);
1114         }
1115         rpc_force_rebind(clnt);
1116
1117 retry:
1118         clnt->cl_stats->rpcretrans++;
1119         task->tk_action = call_bind;
1120         task->tk_status = 0;
1121 }
1122
1123 /*
1124  * 7.   Decode the RPC reply
1125  */
1126 static void
1127 call_decode(struct rpc_task *task)
1128 {
1129         struct rpc_clnt *clnt = task->tk_client;
1130         struct rpc_rqst *req = task->tk_rqstp;
1131         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1132         __be32          *p;
1133
1134         dprintk("RPC: %5u call_decode (status %d)\n",
1135                         task->tk_pid, task->tk_status);
1136
1137         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1138                 printk(KERN_NOTICE "%s: server %s OK\n",
1139                         clnt->cl_protname, clnt->cl_server);
1140                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1141         }
1142
1143         if (task->tk_status < 12) {
1144                 if (!RPC_IS_SOFT(task)) {
1145                         task->tk_action = call_bind;
1146                         clnt->cl_stats->rpcretrans++;
1147                         goto out_retry;
1148                 }
1149                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1150                                 clnt->cl_protname, task->tk_status);
1151                 task->tk_action = call_timeout;
1152                 goto out_retry;
1153         }
1154
1155         /*
1156          * Ensure that we see all writes made by xprt_complete_rqst()
1157          * before it changed req->rq_received.
1158          */
1159         smp_rmb();
1160         req->rq_rcv_buf.len = req->rq_private_buf.len;
1161
1162         /* Check that the softirq receive buffer is valid */
1163         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1164                                 sizeof(req->rq_rcv_buf)) != 0);
1165
1166         /* Verify the RPC header */
1167         p = call_verify(task);
1168         if (IS_ERR(p)) {
1169                 if (p == ERR_PTR(-EAGAIN))
1170                         goto out_retry;
1171                 return;
1172         }
1173
1174         task->tk_action = rpc_exit_task;
1175
1176         if (decode) {
1177                 lock_kernel();
1178                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1179                                                       task->tk_msg.rpc_resp);
1180                 unlock_kernel();
1181         }
1182         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1183                         task->tk_status);
1184         return;
1185 out_retry:
1186         req->rq_received = req->rq_private_buf.len = 0;
1187         task->tk_status = 0;
1188         if (task->tk_client->cl_discrtry)
1189                 xprt_disconnect(task->tk_xprt);
1190 }
1191
1192 /*
1193  * 8.   Refresh the credentials if rejected by the server
1194  */
1195 static void
1196 call_refresh(struct rpc_task *task)
1197 {
1198         dprint_status(task);
1199
1200         xprt_release(task);     /* Must do to obtain new XID */
1201         task->tk_action = call_refreshresult;
1202         task->tk_status = 0;
1203         task->tk_client->cl_stats->rpcauthrefresh++;
1204         rpcauth_refreshcred(task);
1205 }
1206
1207 /*
1208  * 8a.  Process the results of a credential refresh
1209  */
1210 static void
1211 call_refreshresult(struct rpc_task *task)
1212 {
1213         int status = task->tk_status;
1214
1215         dprint_status(task);
1216
1217         task->tk_status = 0;
1218         task->tk_action = call_reserve;
1219         if (status >= 0 && rpcauth_uptodatecred(task))
1220                 return;
1221         if (status == -EACCES) {
1222                 rpc_exit(task, -EACCES);
1223                 return;
1224         }
1225         task->tk_action = call_refresh;
1226         if (status != -ETIMEDOUT)
1227                 rpc_delay(task, 3*HZ);
1228         return;
1229 }
1230
1231 /*
1232  * Call header serialization
1233  */
1234 static __be32 *
1235 call_header(struct rpc_task *task)
1236 {
1237         struct rpc_clnt *clnt = task->tk_client;
1238         struct rpc_rqst *req = task->tk_rqstp;
1239         __be32          *p = req->rq_svec[0].iov_base;
1240
1241         /* FIXME: check buffer size? */
1242
1243         p = xprt_skip_transport_header(task->tk_xprt, p);
1244         *p++ = req->rq_xid;             /* XID */
1245         *p++ = htonl(RPC_CALL);         /* CALL */
1246         *p++ = htonl(RPC_VERSION);      /* RPC version */
1247         *p++ = htonl(clnt->cl_prog);    /* program number */
1248         *p++ = htonl(clnt->cl_vers);    /* program version */
1249         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1250         p = rpcauth_marshcred(task, p);
1251         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1252         return p;
1253 }
1254
1255 /*
1256  * Reply header verification
1257  */
1258 static __be32 *
1259 call_verify(struct rpc_task *task)
1260 {
1261         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1262         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1263         __be32  *p = iov->iov_base;
1264         u32 n;
1265         int error = -EACCES;
1266
1267         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1268                 /* RFC-1014 says that the representation of XDR data must be a
1269                  * multiple of four bytes
1270                  * - if it isn't pointer subtraction in the NFS client may give
1271                  *   undefined results
1272                  */
1273                 printk(KERN_WARNING
1274                        "call_verify: XDR representation not a multiple of"
1275                        " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1276                 goto out_eio;
1277         }
1278         if ((len -= 3) < 0)
1279                 goto out_overflow;
1280         p += 1; /* skip XID */
1281
1282         if ((n = ntohl(*p++)) != RPC_REPLY) {
1283                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1284                 goto out_garbage;
1285         }
1286         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1287                 if (--len < 0)
1288                         goto out_overflow;
1289                 switch ((n = ntohl(*p++))) {
1290                         case RPC_AUTH_ERROR:
1291                                 break;
1292                         case RPC_MISMATCH:
1293                                 dprintk("RPC: %5u %s: RPC call version "
1294                                                 "mismatch!\n",
1295                                                 task->tk_pid, __FUNCTION__);
1296                                 error = -EPROTONOSUPPORT;
1297                                 goto out_err;
1298                         default:
1299                                 dprintk("RPC: %5u %s: RPC call rejected, "
1300                                                 "unknown error: %x\n",
1301                                                 task->tk_pid, __FUNCTION__, n);
1302                                 goto out_eio;
1303                 }
1304                 if (--len < 0)
1305                         goto out_overflow;
1306                 switch ((n = ntohl(*p++))) {
1307                 case RPC_AUTH_REJECTEDCRED:
1308                 case RPC_AUTH_REJECTEDVERF:
1309                 case RPCSEC_GSS_CREDPROBLEM:
1310                 case RPCSEC_GSS_CTXPROBLEM:
1311                         if (!task->tk_cred_retry)
1312                                 break;
1313                         task->tk_cred_retry--;
1314                         dprintk("RPC: %5u %s: retry stale creds\n",
1315                                         task->tk_pid, __FUNCTION__);
1316                         rpcauth_invalcred(task);
1317                         task->tk_action = call_refresh;
1318                         goto out_retry;
1319                 case RPC_AUTH_BADCRED:
1320                 case RPC_AUTH_BADVERF:
1321                         /* possibly garbled cred/verf? */
1322                         if (!task->tk_garb_retry)
1323                                 break;
1324                         task->tk_garb_retry--;
1325                         dprintk("RPC: %5u %s: retry garbled creds\n",
1326                                         task->tk_pid, __FUNCTION__);
1327                         task->tk_action = call_bind;
1328                         goto out_retry;
1329                 case RPC_AUTH_TOOWEAK:
1330                         printk(KERN_NOTICE "call_verify: server %s requires stronger "
1331                                "authentication.\n", task->tk_client->cl_server);
1332                         break;
1333                 default:
1334                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1335                         error = -EIO;
1336                 }
1337                 dprintk("RPC: %5u %s: call rejected %d\n",
1338                                 task->tk_pid, __FUNCTION__, n);
1339                 goto out_err;
1340         }
1341         if (!(p = rpcauth_checkverf(task, p))) {
1342                 printk(KERN_WARNING "call_verify: auth check failed\n");
1343                 goto out_garbage;               /* bad verifier, retry */
1344         }
1345         len = p - (__be32 *)iov->iov_base - 1;
1346         if (len < 0)
1347                 goto out_overflow;
1348         switch ((n = ntohl(*p++))) {
1349         case RPC_SUCCESS:
1350                 return p;
1351         case RPC_PROG_UNAVAIL:
1352                 dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1353                                 task->tk_pid, __FUNCTION__,
1354                                 (unsigned int)task->tk_client->cl_prog,
1355                                 task->tk_client->cl_server);
1356                 error = -EPFNOSUPPORT;
1357                 goto out_err;
1358         case RPC_PROG_MISMATCH:
1359                 dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1360                                 "server %s\n", task->tk_pid, __FUNCTION__,
1361                                 (unsigned int)task->tk_client->cl_prog,
1362                                 (unsigned int)task->tk_client->cl_vers,
1363                                 task->tk_client->cl_server);
1364                 error = -EPROTONOSUPPORT;
1365                 goto out_err;
1366         case RPC_PROC_UNAVAIL:
1367                 dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1368                                 "version %u on server %s\n",
1369                                 task->tk_pid, __FUNCTION__,
1370                                 task->tk_msg.rpc_proc,
1371                                 task->tk_client->cl_prog,
1372                                 task->tk_client->cl_vers,
1373                                 task->tk_client->cl_server);
1374                 error = -EOPNOTSUPP;
1375                 goto out_err;
1376         case RPC_GARBAGE_ARGS:
1377                 dprintk("RPC: %5u %s: server saw garbage\n",
1378                                 task->tk_pid, __FUNCTION__);
1379                 break;                  /* retry */
1380         default:
1381                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1382                 /* Also retry */
1383         }
1384
1385 out_garbage:
1386         task->tk_client->cl_stats->rpcgarbage++;
1387         if (task->tk_garb_retry) {
1388                 task->tk_garb_retry--;
1389                 dprintk("RPC: %5u %s: retrying\n",
1390                                 task->tk_pid, __FUNCTION__);
1391                 task->tk_action = call_bind;
1392 out_retry:
1393                 return ERR_PTR(-EAGAIN);
1394         }
1395         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1396 out_eio:
1397         error = -EIO;
1398 out_err:
1399         rpc_exit(task, error);
1400         return ERR_PTR(error);
1401 out_overflow:
1402         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1403         goto out_garbage;
1404 }
1405
1406 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1407 {
1408         return 0;
1409 }
1410
1411 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1412 {
1413         return 0;
1414 }
1415
1416 static struct rpc_procinfo rpcproc_null = {
1417         .p_encode = rpcproc_encode_null,
1418         .p_decode = rpcproc_decode_null,
1419 };
1420
1421 int rpc_ping(struct rpc_clnt *clnt, int flags)
1422 {
1423         struct rpc_message msg = {
1424                 .rpc_proc = &rpcproc_null,
1425         };
1426         int err;
1427         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1428         err = rpc_call_sync(clnt, &msg, flags);
1429         put_rpccred(msg.rpc_cred);
1430         return err;
1431 }
1432
1433 #ifdef RPC_DEBUG
1434 void rpc_show_tasks(void)
1435 {
1436         struct rpc_clnt *clnt;
1437         struct rpc_task *t;
1438
1439         spin_lock(&rpc_client_lock);
1440         if (list_empty(&all_clients))
1441                 goto out;
1442         printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1443                 "-rpcwait -action- ---ops--\n");
1444         list_for_each_entry(clnt, &all_clients, cl_clients) {
1445                 if (list_empty(&clnt->cl_tasks))
1446                         continue;
1447                 spin_lock(&clnt->cl_lock);
1448                 list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
1449                         const char *rpc_waitq = "none";
1450
1451                         if (RPC_IS_QUEUED(t))
1452                                 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1453
1454                         printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1455                                 t->tk_pid,
1456                                 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
1457                                 t->tk_flags, t->tk_status,
1458                                 t->tk_client,
1459                                 (t->tk_client ? t->tk_client->cl_prog : 0),
1460                                 t->tk_rqstp, t->tk_timeout,
1461                                 rpc_waitq,
1462                                 t->tk_action, t->tk_ops);
1463                 }
1464                 spin_unlock(&clnt->cl_lock);
1465         }
1466 out:
1467         spin_unlock(&rpc_client_lock);
1468 }
1469 #endif