]> pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - net/rxrpc/ar-input.c
Merge branch 'for-linus' of git://git.o-hand.com/linux-rpurdie-leds
[linux-2.6-omap-h63xx.git] / net / rxrpc / ar-input.c
index 323c3454561cecc976f04d7f866e34589ec05ab5..91b5bbb003e26b53496cfda72b687e009487e9ba 100644 (file)
@@ -42,6 +42,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
                        bool force, bool terminal)
 {
        struct rxrpc_skb_priv *sp;
+       struct rxrpc_sock *rx = call->socket;
        struct sock *sk;
        int skb_len, ret;
 
@@ -64,7 +65,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
                return 0;
        }
 
-       sk = &call->socket->sk;
+       sk = &rx->sk;
 
        if (!force) {
                /* cast skb->rcvbuf to unsigned...  It's pointless, but
@@ -89,25 +90,30 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
                skb->sk = sk;
                atomic_add(skb->truesize, &sk->sk_rmem_alloc);
 
-               /* Cache the SKB length before we tack it onto the receive
-                * queue.  Once it is added it no longer belongs to us and
-                * may be freed by other threads of control pulling packets
-                * from the queue.
-                */
-               skb_len = skb->len;
-
-               _net("post skb %p", skb);
-               __skb_queue_tail(&sk->sk_receive_queue, skb);
-               spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-               if (!sock_flag(sk, SOCK_DEAD))
-                       sk->sk_data_ready(sk, skb_len);
-
                if (terminal) {
                        _debug("<<<< TERMINAL MESSAGE >>>>");
                        set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags);
                }
 
+               /* allow interception by a kernel service */
+               if (rx->interceptor) {
+                       rx->interceptor(sk, call->user_call_ID, skb);
+                       spin_unlock_bh(&sk->sk_receive_queue.lock);
+               } else {
+
+                       /* Cache the SKB length before we tack it onto the
+                        * receive queue.  Once it is added it no longer
+                        * belongs to us and may be freed by other threads of
+                        * control pulling packets from the queue */
+                       skb_len = skb->len;
+
+                       _net("post skb %p", skb);
+                       __skb_queue_tail(&sk->sk_receive_queue, skb);
+                       spin_unlock_bh(&sk->sk_receive_queue.lock);
+
+                       if (!sock_flag(sk, SOCK_DEAD))
+                               sk->sk_data_ready(sk, skb_len);
+               }
                skb = NULL;
        } else {
                spin_unlock_bh(&sk->sk_receive_queue.lock);
@@ -162,7 +168,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
        /* we may already have the packet in the out of sequence queue */
        ackbit = seq - (call->rx_data_eaten + 1);
        ASSERTCMP(ackbit, >=, 0);
-       if (__test_and_set_bit(ackbit, &call->ackr_window)) {
+       if (__test_and_set_bit(ackbit, call->ackr_window)) {
                _debug("dup oos #%u [%u,%u]",
                       seq, call->rx_data_eaten, call->rx_data_post);
                ack = RXRPC_ACK_DUPLICATE;
@@ -171,7 +177,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
 
        if (seq >= call->ackr_win_top) {
                _debug("exceed #%u [%u]", seq, call->ackr_win_top);
-               __clear_bit(ackbit, &call->ackr_window);
+               __clear_bit(ackbit, call->ackr_window);
                ack = RXRPC_ACK_EXCEEDS_WINDOW;
                goto discard_and_ack;
        }
@@ -209,7 +215,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
        ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
        if (ret < 0) {
                if (ret == -ENOMEM || ret == -ENOBUFS) {
-                       __clear_bit(ackbit, &call->ackr_window);
+                       __clear_bit(ackbit, call->ackr_window);
                        ack = RXRPC_ACK_NOSPACE;
                        goto discard_and_ack;
                }
@@ -232,7 +238,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
                read_lock(&call->state_lock);
                if (call->state < RXRPC_CALL_COMPLETE &&
                    !test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events))
-                       schedule_work(&call->processor);
+                       rxrpc_queue_call(call);
                read_unlock(&call->state_lock);
        }
 
@@ -267,7 +273,7 @@ enqueue_packet:
        atomic_inc(&call->ackr_not_idle);
        read_lock(&call->state_lock);
        if (call->state < RXRPC_CALL_DEAD)
-               schedule_work(&call->processor);
+               rxrpc_queue_call(call);
        read_unlock(&call->state_lock);
        _leave(" = 0 [queued]");
        return 0;
@@ -360,7 +366,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
                        call->state = RXRPC_CALL_REMOTELY_ABORTED;
                        call->abort_code = abort_code;
                        set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
-                       schedule_work(&call->processor);
+                       rxrpc_queue_call(call);
                }
                goto free_packet_unlock;
 
@@ -375,7 +381,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
                case RXRPC_CALL_CLIENT_SEND_REQUEST:
                        call->state = RXRPC_CALL_SERVER_BUSY;
                        set_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
-                       schedule_work(&call->processor);
+                       rxrpc_queue_call(call);
                case RXRPC_CALL_SERVER_BUSY:
                        goto free_packet_unlock;
                default:
@@ -419,7 +425,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
                read_lock_bh(&call->state_lock);
                if (call->state < RXRPC_CALL_DEAD) {
                        skb_queue_tail(&call->rx_queue, skb);
-                       schedule_work(&call->processor);
+                       rxrpc_queue_call(call);
                        skb = NULL;
                }
                read_unlock_bh(&call->state_lock);
@@ -434,7 +440,7 @@ protocol_error_locked:
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
                call->abort_code = RX_PROTOCOL_ERROR;
                set_bit(RXRPC_CALL_ABORT, &call->events);
-               schedule_work(&call->processor);
+               rxrpc_queue_call(call);
        }
 free_packet_unlock:
        write_unlock_bh(&call->state_lock);
@@ -506,7 +512,7 @@ protocol_error:
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
                call->abort_code = RX_PROTOCOL_ERROR;
                set_bit(RXRPC_CALL_ABORT, &call->events);
-               schedule_work(&call->processor);
+               rxrpc_queue_call(call);
        }
        write_unlock_bh(&call->state_lock);
        _leave("");
@@ -542,7 +548,7 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
        switch (call->state) {
        case RXRPC_CALL_LOCALLY_ABORTED:
                if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
-                       schedule_work(&call->processor);
+                       rxrpc_queue_call(call);
        case RXRPC_CALL_REMOTELY_ABORTED:
        case RXRPC_CALL_NETWORK_ERROR:
        case RXRPC_CALL_DEAD:
@@ -591,7 +597,7 @@ dead_call:
            sp->hdr.seq == __constant_cpu_to_be32(1)) {
                _debug("incoming call");
                skb_queue_tail(&conn->trans->local->accept_queue, skb);
-               schedule_work(&conn->trans->local->acceptor);
+               rxrpc_queue_work(&conn->trans->local->acceptor);
                goto done;
        }
 
@@ -630,7 +636,7 @@ found_completed_call:
        _debug("final ack again");
        rxrpc_get_call(call);
        set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
-       schedule_work(&call->processor);
+       rxrpc_queue_call(call);
 
 free_unlock:
        read_unlock(&call->state_lock);
@@ -651,7 +657,7 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
 
        atomic_inc(&conn->usage);
        skb_queue_tail(&conn->rx_queue, skb);
-       schedule_work(&conn->processor);
+       rxrpc_queue_conn(conn);
 }
 
 /*
@@ -767,7 +773,7 @@ cant_route_call:
                if (sp->hdr.seq == __constant_cpu_to_be32(1)) {
                        _debug("first packet");
                        skb_queue_tail(&local->accept_queue, skb);
-                       schedule_work(&local->acceptor);
+                       rxrpc_queue_work(&local->acceptor);
                        rxrpc_put_local(local);
                        _leave(" [incoming]");
                        return;