From 8809b255a9fca8c3179491d3bc9268c42e23ba97 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 10:44:35 -0400 Subject: tipc: improve the link deferred queue insertion algorithm Re-code the algorithm for inserting an out-of-sequence message into a unicast or broadcast link's deferred message queue. It remains functionally equivalent but should be easier to understand/maintain. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 48 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ac1832a66f8a..c317abf74a78 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1853,17 +1853,16 @@ cont: } /* - * link_defer_buf(): Sort a received out-of-sequence packet - * into the deferred reception queue. - * Returns the increase of the queue length,i.e. 0 or 1 + * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue + * + * Returns increase in queue length (i.e. 0 or 1) */ -u32 tipc_link_defer_pkt(struct sk_buff **head, - struct sk_buff **tail, +u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, struct sk_buff *buf) { - struct sk_buff *prev = NULL; - struct sk_buff *crs = *head; + struct sk_buff *queue_buf; + struct sk_buff **prev; u32 seq_no = buf_seqno(buf); buf->next = NULL; @@ -1881,31 +1880,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, return 1; } - /* Scan through queue and sort it in */ - do { - struct tipc_msg *msg = buf_msg(crs); + /* Locate insertion point in queue, then insert; discard if duplicate */ + prev = head; + queue_buf = *head; + for (;;) { + u32 curr_seqno = buf_seqno(queue_buf); - if (less(seq_no, msg_seqno(msg))) { - buf->next = crs; - if (prev) - prev->next = buf; - else - *head = buf; - return 1; + if (seq_no == curr_seqno) { + buf_discard(buf); + return 0; } - if (seq_no == msg_seqno(msg)) + + if (less(seq_no, curr_seqno)) break; - prev = crs; - crs = crs->next; - } while (crs); - /* Message is a duplicate of an existing message */ + prev = &queue_buf->next; + queue_buf = queue_buf->next; + } - buf_discard(buf); - return 0; + buf->next = queue_buf; + *prev = buf; + return 1; } -/** +/* * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet */ -- cgit v1.2.3 From 92d2c905b404d8d056ce35a0ce645e23529742c2 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 11:20:26 -0400 Subject: tipc: Prevent transmission of outdated link protocol messages Ensures that a link endpoint discards any previously deferred link protocol message whenever it attempts to send a new one. Previously, it was possible for a link protocol message that was unsent due to congestion to be transmitted after newer protocol messages had been sent. The stale link protocol message might then cause the receiving link endpoint to malfunction because of its outdated conent. Thanks to Osamu Kaminuma [okaminum@avaya.com] for diagnosing the problem and contributing a prototype patch. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 53 +++++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index c317abf74a78..bee316ce387c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1954,6 +1954,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, u32 msg_size = sizeof(l_ptr->proto_msg); int r_flag; + /* Discard any previous message that was deferred due to congestion */ + + if (l_ptr->proto_msg_queue) { + buf_discard(l_ptr->proto_msg_queue); + l_ptr->proto_msg_queue = NULL; + } + if (link_blocked(l_ptr)) return; @@ -1962,6 +1969,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) return; + /* Create protocol message with "out-of-sequence" sequence number */ + msg_set_type(msg, msg_typ); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); @@ -2018,44 +2027,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); msg_set_redundant_link(msg, r_flag); msg_set_linkprio(msg, l_ptr->priority); - - /* Ensure sequence number will not fit : */ + msg_set_size(msg, msg_size); msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); - /* Congestion? */ - - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { - if (!l_ptr->proto_msg_queue) { - l_ptr->proto_msg_queue = - tipc_buf_acquire(sizeof(l_ptr->proto_msg)); - } - buf = l_ptr->proto_msg_queue; - if (!buf) - return; - skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - return; - } - - /* Message can be sent */ - buf = tipc_buf_acquire(msg_size); if (!buf) return; skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - msg_set_size(buf_msg(buf), msg_size); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->unacked_window = 0; - buf_discard(buf); + /* Defer message if bearer is already congested */ + + if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { + l_ptr->proto_msg_queue = buf; + return; + } + + /* Defer message if attempting to send results in bearer congestion */ + + if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { + tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); + l_ptr->proto_msg_queue = buf; + l_ptr->stats.bearer_congs++; return; } - /* New congestion */ - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->proto_msg_queue = buf; - l_ptr->stats.bearer_congs++; + /* Discard message if it was sent successfully */ + + l_ptr->unacked_window = 0; + buf_discard(buf); } /* -- cgit v1.2.3 From 4d75313ce9b832efc4efb487f080b5ed72beae2c Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 12:19:05 -0400 Subject: tipc: Prevent broadcast link stalling in dual LAN environments Ensure that sequence number information about incoming broadcast link messages is initialized only by the activation of the first link to a given cluster node. Previously, a race condition allowed reset and/or activation messages for a second link to re-initialize this sequence number information with obsolete values. This could trigger TIPC to request the retransmission of previously acknowledged broadcast link messages from that node, resulting in broadcast link processing becoming stalled if the node had already released one or more of those messages and was unable to perform the required retransmission. Thanks to Laser for identifying this problem and assisting in the development of this fix. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index bee316ce387c..4ea6cad11746 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2128,14 +2128,15 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } l_ptr->owner->bclink.supported = (max_pkt_info != 0); + /* Synchronize broadcast link info, if not done previously */ + + if (!tipc_node_is_up(l_ptr->owner)) + l_ptr->owner->bclink.last_in = msg_last_bcast(msg); + link_state_event(l_ptr, msg_type(msg)); l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); - - /* Synchronize broadcast sequence numbers */ - if (!tipc_node_redundant_links(l_ptr->owner)) - l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); break; case STATE_MSG: -- cgit v1.2.3 From 934993137199ffb56fef50664f87e71cdb3471b0 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 15:14:46 -0400 Subject: tipc: Ensure broadcast link re-acquires node after link failure Fix a bug that can prevent TIPC from sending broadcast messages to a node if contact with the node is lost and then regained. The problem occurs if the broadcast link first clears the flag indicating the node is part of the link's distribution set (when it loses contact with the node), and later fails to restore the flag (when contact is regained); restoration fails if contact with the node is regained by implicit unicast link activation triggered by the arrival of a data message, rather than explicitly by the arrival of a link activation message. The broadcast link now uses separate fields to track whether a node is theoretically capable of receiving broadcast messages versus whether it is actually part of the link's distribution set. The former member is updated by the receipt of link protocol messages, which can occur at any time; the latter member is updated only when contact with the node is gained or lost. This change also permits the simplification of several conditional expressions since the broadcast link's "supported" field can now only be set if there are working links to the associated node. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 5 +++-- net/tipc/node.c | 3 ++- net/tipc/node.h | 4 +++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 4ea6cad11746..3405f560a84d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1502,6 +1502,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_addr_string_fill(addr_string, n_ptr->addr); info("Multicast link info for %s\n", addr_string); + info("Supportable: %d, ", n_ptr->bclink.supportable); info("Supported: %d, ", n_ptr->bclink.supported); info("Acked: %u\n", n_ptr->bclink.acked); info("Last in: %u, ", n_ptr->bclink.last_in); @@ -1736,7 +1737,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Release acked messages */ - if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) + if (n_ptr->bclink.supported) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); crs = l_ptr->first_out; @@ -2126,7 +2127,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } else { l_ptr->max_pkt = l_ptr->max_pkt_target; } - l_ptr->owner->bclink.supported = (max_pkt_info != 0); + l_ptr->owner->bclink.supportable = (max_pkt_info != 0); /* Synchronize broadcast link info, if not done previously */ diff --git a/net/tipc/node.c b/net/tipc/node.c index 6b226faad89f..9196f943b835 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -306,8 +306,9 @@ static void node_established_contact(struct tipc_node *n_ptr) /* Syncronize broadcast acks */ n_ptr->bclink.acked = tipc_bclink_get_last_sent(); - if (n_ptr->bclink.supported) { + if (n_ptr->bclink.supportable) { tipc_bclink_add_node(n_ptr->addr); + n_ptr->bclink.supported = 1; if (n_ptr->addr < tipc_own_addr) tipc_own_tag++; } diff --git a/net/tipc/node.h b/net/tipc/node.h index 0b1c5f8b6996..90689f487615 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -62,6 +62,7 @@ * @link_cnt: number of links to node * @permit_changeover: non-zero if node has redundant links to this system * @bclink: broadcast-related info + * @supportable: non-zero if node supports TIPC b'cast link capability * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node @@ -86,7 +87,8 @@ struct tipc_node { int block_setup; int permit_changeover; struct { - int supported; + u8 supportable; + u8 supported; u32 acked; u32 last_in; u32 gap_after; -- cgit v1.2.3 From 47361c87c504d89f1ba50b4230d56ef67792c258 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 10:55:16 -0400 Subject: tipc: Fix problem with broadcast link synchronization between nodes Corrects a problem in which a link endpoint that activates as the result of receiving a RESET/STATE sequence of link protocol messages fails to properly record the broadcast link status information about the node to which it is now communicating with. (The problem does not occur with the more common RESET/ACTIVATE sequence of messages.) The fix ensures that the broadcast link status info is updated after the RESET message resets the link endpoint, rather than before, thereby preventing new information from being overwritten by the reset operation. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 3405f560a84d..1150ba5a648b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2105,6 +2105,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) l_ptr->owner->block_setup = WAIT_NODE_DOWN; } + link_state_event(l_ptr, RESET_MSG); + /* fall thru' */ case ACTIVATE_MSG: /* Update link settings according other endpoint's values */ @@ -2134,10 +2136,11 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) if (!tipc_node_is_up(l_ptr->owner)) l_ptr->owner->bclink.last_in = msg_last_bcast(msg); - link_state_event(l_ptr, msg_type(msg)); - l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); + + if (msg_type(msg) == ACTIVATE_MSG) + link_state_event(l_ptr, ACTIVATE_MSG); break; case STATE_MSG: -- cgit v1.2.3 From 57732560d1aa7d454d10e557f8959d19d1454174 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 11:41:45 -0400 Subject: tipc: Add missing broadcast link lock when sending NACK Ensures that any attempt to send a NACK message over TIPC's broadcast link has exclusive access to the link's main data structures, to prevent interference with a simultaneous attempt to send other broadcast link traffic (such as application-generated multicast messages). Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 8eb87b11d100..7342abc2cfa1 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -340,8 +340,10 @@ static void bclink_send_nack(struct tipc_node *n_ptr) msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); msg_set_bcast_tag(msg, tipc_own_tag); + spin_lock_bh(&bc_lock); tipc_bearer_send(&bcbearer->bearer, buf, NULL); bcl->stats.sent_nacks++; + spin_unlock_bh(&bc_lock); buf_discard(buf); /* -- cgit v1.2.3 From 8a275a6a30ba871eb34ea41c1fbb507039f4c0dc Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 15:33:44 -0400 Subject: tipc: Fix node lock reclamation issues in broadcast link reception Fixes a pair of problems in broadcast link message reception code relating to the reclamation of the node lock after consuming an in-sequence message. 1) Now retests to see if the sending node is still up after reclaiming the node lock, and bails out if it is non-operational. 2) Now manipulates the node's deferred message queue only after reclaiming the node lock, rather than using queue head pointer information that was cached previously. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 58 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 7342abc2cfa1..e7df313020ce 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -474,7 +474,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) struct tipc_node *node; u32 next_in; u32 seqno; - struct sk_buff *deferred; + int deferred; /* Screen out unwanted broadcast messages */ @@ -489,6 +489,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) if (unlikely(!node->bclink.supported)) goto unlock; + /* Handle broadcast protocol message */ + if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { if (msg_type(msg) != STATE_MSG) goto unlock; @@ -513,11 +515,11 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) /* Handle in-sequence broadcast message */ -receive: - next_in = mod(node->bclink.last_in + 1); seqno = msg_seqno(msg); + next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { +receive: bcl->stats.recv_info++; node->bclink.last_in++; bclink_set_gap(node); @@ -551,23 +553,40 @@ receive: buf_discard(buf); } buf = NULL; + + /* Determine new synchronization state */ + tipc_node_lock(node); - deferred = node->bclink.deferred_head; - if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { - buf = deferred; - msg = buf_msg(buf); - node->bclink.deferred_head = deferred->next; - goto receive; - } - } else if (less(next_in, seqno)) { + if (unlikely(!tipc_node_is_up(node))) + goto unlock; + + if (!node->bclink.deferred_head) + goto unlock; + + msg = buf_msg(node->bclink.deferred_head); + seqno = msg_seqno(msg); + next_in = mod(next_in + 1); + if (seqno != next_in) + goto unlock; + + /* Take in-sequence message from deferred queue & deliver it */ + + buf = node->bclink.deferred_head; + node->bclink.deferred_head = buf->next; + goto receive; + } + + /* Handle out-of-sequence broadcast message */ + + if (less(next_in, seqno)) { u32 gap_after = node->bclink.gap_after; u32 gap_to = node->bclink.gap_to; - if (tipc_link_defer_pkt(&node->bclink.deferred_head, - &node->bclink.deferred_tail, - buf)) { + deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, + &node->bclink.deferred_tail, + buf); + if (deferred) { node->bclink.nack_sync++; - bcl->stats.deferred_recv++; if (seqno == mod(gap_after + 1)) node->bclink.gap_after = seqno; else if (less(gap_after, seqno) && less(seqno, gap_to)) @@ -579,9 +598,12 @@ receive: bclink_send_nack(node); bclink_set_gap(node); } - } else { - bcl->stats.duplicates++; - } + } else + deferred = 0; + + if (deferred) + bcl->stats.deferred_recv++; + unlock: tipc_node_unlock(node); exit: -- cgit v1.2.3 From 0232c5a566ff52d5c9fc1dda70253c942628ca66 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 15:57:26 -0400 Subject: tipc: Fix bug in broadcast link duplicate message statistics Modifies broadcast link so that it increments the "received duplicate message" count if an incoming message cannot be added to the deferred message queue because it is already present in the queue. (The aligns broadcast link behavior with that of TIPC's unicast links.) Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e7df313020ce..035b350be5c6 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -603,6 +603,8 @@ receive: if (deferred) bcl->stats.deferred_recv++; + else + bcl->stats.duplicates++; unlock: tipc_node_unlock(node); -- cgit v1.2.3 From b98158e3b36645305363a598d91c544fa31446f1 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 16:13:35 -0400 Subject: tipc: Add missing locks in broadcast link statistics accumulation Ensures that all attempts to update broadcast link statistics are done only while holding the lock that protects the link's main data structures, to prevent interference by simultaneous updates caused by messages arriving on other interfaces. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 035b350be5c6..facc216c6a92 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -520,6 +520,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) if (likely(seqno == next_in)) { receive: + spin_lock_bh(&bc_lock); bcl->stats.recv_info++; node->bclink.last_in++; bclink_set_gap(node); @@ -527,7 +528,9 @@ receive: bclink_send_ack(node); bcl->stats.sent_acks++; } + if (likely(msg_isdata(msg))) { + spin_unlock_bh(&bc_lock); tipc_node_unlock(node); if (likely(msg_mcast(msg))) tipc_port_recv_mcast(buf, NULL); @@ -536,6 +539,7 @@ receive: } else if (msg_user(msg) == MSG_BUNDLER) { bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); + spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_link_recv_bundle(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { @@ -543,12 +547,15 @@ receive: if (tipc_link_recv_fragment(&node->bclink.defragm, &buf, &msg)) bcl->stats.recv_fragmented++; + spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_net_route_msg(buf); } else if (msg_user(msg) == NAME_DISTRIBUTOR) { + spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_named_recv(buf); } else { + spin_unlock_bh(&bc_lock); tipc_node_unlock(node); buf_discard(buf); } @@ -601,11 +608,15 @@ receive: } else deferred = 0; + spin_lock_bh(&bc_lock); + if (deferred) bcl->stats.deferred_recv++; else bcl->stats.duplicates++; + spin_unlock_bh(&bc_lock); + unlock: tipc_node_unlock(node); exit: -- cgit v1.2.3 From 7a54d4a99dcbbfdf1d4550faa19b615091137953 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 14:17:53 -0400 Subject: tipc: Major redesign of broadcast link ACK/NACK algorithms Completely redesigns broadcast link ACK and NACK mechanisms to prevent spurious retransmit requests in dual LAN networks, and to prevent the broadcast link from stalling due to the failure of a receiving node to acknowledge receiving a broadcast message or request its retransmission. Note: These changes only impact the timing of when ACK and NACK messages are sent, and not the basic broadcast link protocol itself, so inter- operability with nodes using the "classic" algorithms is maintained. The revised algorithms are as follows: 1) An explicit ACK message is still sent after receiving 16 in-sequence messages, and implicit ACK information continues to be carried in other unicast link message headers (including link state messages). However, the timing of explicit ACKs is now based on the receiving node's absolute network address rather than its relative network address to ensure that the failure of another node does not delay the ACK beyond its 16 message target. 2) A NACK message is now typically sent only when a message gap persists for two consecutive incoming link state messages; this ensures that a suspected gap is not confirmed until both LANs in a dual LAN network have had an opportunity to deliver the message, thereby preventing spurious NACKs. A NACK message can also be generated by the arrival of a single link state message, if the deferred queue is so big that the current message gap cannot be the result of "normal" mis-ordering due to the use of dual LANs (or one LAN using a bonded interface). Since link state messages typically arrive at different nodes at different times the problem of multiple nodes issuing identical NACKs simultaneously is inherently avoided. 3) Nodes continue to "peek" at NACK messages sent by other nodes. If another node requests retransmission of a message gap suspected (but not yet confirmed) by the peeking node, the peeking node forgets about the gap and does not generate a duplicate retransmit request. (If the peeking node subsequently fails to receive the lost message, later link state messages will cause it to rediscover and confirm the gap and send another NACK.) 4) Message gap "equality" is now determined by the start of the gap only. This is sufficient to deal with the most common cases of message loss, and eliminates the need for complex end of gap computations. 5) A peeking node no longer tries to determine whether it should send a complementary NACK, since the most common cases of message loss don't require it to be sent. Consequently, the node no longer examines the "broadcast tag" field of a NACK message when peeking. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 226 +++++++++++++++++++------------------------------------ net/tipc/bcast.h | 2 +- net/tipc/link.c | 21 ++++-- net/tipc/node.c | 4 +- net/tipc/node.h | 12 +-- 5 files changed, 100 insertions(+), 165 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index facc216c6a92..1f3b1607d9d4 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void) return bcl->fsm_msg_cnt; } -/** - * bclink_set_gap - set gap according to contents of current deferred pkt queue - * - * Called with 'node' locked, bc_lock unlocked - */ - -static void bclink_set_gap(struct tipc_node *n_ptr) -{ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - - n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = - mod(n_ptr->bclink.last_in); - if (unlikely(buf != NULL)) - n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); -} - -/** - * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment - * - * This mechanism endeavours to prevent all nodes in network from trying - * to ACK or NACK at the same time. - * - * Note: TIPC uses a different trigger to distribute ACKs than it does to - * distribute NACKs, but tries to use the same spacing (divide by 16). - */ - -static int bclink_ack_allowed(u32 n) +static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) { - return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; + node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? + seqno : node->bclink.last_sent; } -/** +/* * tipc_bclink_retransmit_to - get most recent node to request retransmission * * Called with bc_lock locked @@ -300,44 +275,56 @@ exit: spin_unlock_bh(&bc_lock); } -/** - * bclink_send_ack - unicast an ACK msg +/* + * tipc_bclink_update_link_state - update broadcast link state * * tipc_net_lock and node lock set */ -static void bclink_send_ack(struct tipc_node *n_ptr) +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) { - struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; + struct sk_buff *buf; - if (l_ptr != NULL) - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); -} + /* Ignore "stale" link state info */ -/** - * bclink_send_nack- broadcast a NACK msg - * - * tipc_net_lock and node lock set - */ + if (less_eq(last_sent, n_ptr->bclink.last_in)) + return; -static void bclink_send_nack(struct tipc_node *n_ptr) -{ - struct sk_buff *buf; - struct tipc_msg *msg; + /* Update link synchronization state; quit if in sync */ + + bclink_update_last_sent(n_ptr, last_sent); + + if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) + return; + + /* Update out-of-sync state; quit if loss is still unconfirmed */ - if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) + if ((++n_ptr->bclink.oos_state) == 1) { + if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) + return; + n_ptr->bclink.oos_state++; + } + + /* Don't NACK if one has been recently sent (or seen) */ + + if (n_ptr->bclink.oos_state & 0x1) return; + /* Send NACK */ + buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { - msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(buf); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, n_ptr->addr); + INT_H_SIZE, n_ptr->addr); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); - msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); - msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); - msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); + msg_set_bcast_ack(msg, n_ptr->bclink.last_in); + msg_set_bcgap_after(msg, n_ptr->bclink.last_in); + msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head + ? buf_seqno(n_ptr->bclink.deferred_head) - 1 + : n_ptr->bclink.last_sent); msg_set_bcast_tag(msg, tipc_own_tag); spin_lock_bh(&bc_lock); @@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr) spin_unlock_bh(&bc_lock); buf_discard(buf); - /* - * Ensure we doesn't send another NACK msg to the node - * until 16 more deferred messages arrive from it - * (i.e. helps prevent all nodes from NACK'ing at same time) - */ - - n_ptr->bclink.nack_sync = tipc_own_tag; + n_ptr->bclink.oos_state++; } } -/** - * tipc_bclink_check_gap - send a NACK if a sequence gap exists +/* + * bclink_peek_nack - monitor retransmission requests sent by other nodes * - * tipc_net_lock and node lock set - */ - -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) -{ - if (!n_ptr->bclink.supported || - less_eq(last_sent, mod(n_ptr->bclink.last_in))) - return; - - bclink_set_gap(n_ptr); - if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) - n_ptr->bclink.gap_to = last_sent; - bclink_send_nack(n_ptr); -} - -/** - * tipc_bclink_peek_nack - process a NACK msg meant for another node + * Delay any upcoming NACK by this node if another node has already + * requested the first message this node is going to ask for. * * Only tipc_net_lock set. */ -static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) +static void bclink_peek_nack(struct tipc_msg *msg) { - struct tipc_node *n_ptr = tipc_node_find(dest); - u32 my_after, my_to; + struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); - if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) + if (unlikely(!n_ptr)) return; + tipc_node_lock(n_ptr); - /* - * Modify gap to suppress unnecessary NACKs from this node - */ - my_after = n_ptr->bclink.gap_after; - my_to = n_ptr->bclink.gap_to; - - if (less_eq(gap_after, my_after)) { - if (less(my_after, gap_to) && less(gap_to, my_to)) - n_ptr->bclink.gap_after = gap_to; - else if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; - } else if (less_eq(gap_after, my_to)) { - if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = gap_after; - } else { - /* - * Expand gap if missing bufs not in deferred queue: - */ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - u32 prev = n_ptr->bclink.gap_to; - for (; buf; buf = buf->next) { - u32 seqno = buf_seqno(buf); + if (n_ptr->bclink.supported && + (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && + (n_ptr->bclink.last_in == msg_bcgap_after(msg))) + n_ptr->bclink.oos_state = 2; - if (mod(seqno - prev) != 1) { - buf = NULL; - break; - } - if (seqno == gap_after) - break; - prev = seqno; - } - if (buf == NULL) - n_ptr->bclink.gap_to = gap_after; - } - /* - * Some nodes may send a complementary NACK now: - */ - if (bclink_ack_allowed(sender_tag + 1)) { - if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { - bclink_send_nack(n_ptr); - bclink_set_gap(n_ptr); - } - } tipc_node_unlock(n_ptr); } -/** +/* * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster */ @@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) spin_unlock_bh(&bc_lock); } else { tipc_node_unlock(node); - tipc_bclink_peek_nack(msg_destnode(msg), - msg_bcast_tag(msg), - msg_bcgap_after(msg), - msg_bcgap_to(msg)); + bclink_peek_nack(msg); } goto exit; } @@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { + bclink_update_last_sent(node, seqno); receive: + node->bclink.last_in = seqno; + node->bclink.oos_state = 0; + spin_lock_bh(&bc_lock); bcl->stats.recv_info++; - node->bclink.last_in++; - bclink_set_gap(node); - if (unlikely(bclink_ack_allowed(seqno))) { - bclink_send_ack(node); + + /* + * Unicast an ACK periodically, ensuring that + * all nodes in the cluster don't ACK at the same time + */ + + if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { + tipc_link_send_proto_msg( + node->active_links[node->addr & 1], + STATE_MSG, 0, 0, 0, 0, 0); bcl->stats.sent_acks++; } + /* Deliver message to destination */ + if (likely(msg_isdata(msg))) { spin_unlock_bh(&bc_lock); tipc_node_unlock(node); @@ -567,9 +504,14 @@ receive: if (unlikely(!tipc_node_is_up(node))) goto unlock; - if (!node->bclink.deferred_head) + if (node->bclink.last_in == node->bclink.last_sent) goto unlock; + if (!node->bclink.deferred_head) { + node->bclink.oos_state = 1; + goto unlock; + } + msg = buf_msg(node->bclink.deferred_head); seqno = msg_seqno(msg); next_in = mod(next_in + 1); @@ -580,31 +522,19 @@ receive: buf = node->bclink.deferred_head; node->bclink.deferred_head = buf->next; + node->bclink.deferred_size--; goto receive; } /* Handle out-of-sequence broadcast message */ if (less(next_in, seqno)) { - u32 gap_after = node->bclink.gap_after; - u32 gap_to = node->bclink.gap_to; - deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, &node->bclink.deferred_tail, buf); - if (deferred) { - node->bclink.nack_sync++; - if (seqno == mod(gap_after + 1)) - node->bclink.gap_after = seqno; - else if (less(gap_after, seqno) && less(seqno, gap_to)) - node->bclink.gap_to = seqno; - } + node->bclink.deferred_size += deferred; + bclink_update_last_sent(node, seqno); buf = NULL; - if (bclink_ack_allowed(node->bclink.nack_sync)) { - if (gap_to != gap_after) - bclink_send_nack(node); - bclink_set_gap(node); - } } else deferred = 0; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index b009666c60b0..5571394098f9 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf); void tipc_bclink_recv_pkt(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno); +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent); int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); diff --git a/net/tipc/link.c b/net/tipc/link.c index 1150ba5a648b..cce953723ddb 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1501,14 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_node_lock(n_ptr); tipc_addr_string_fill(addr_string, n_ptr->addr); - info("Multicast link info for %s\n", addr_string); + info("Broadcast link info for %s\n", addr_string); info("Supportable: %d, ", n_ptr->bclink.supportable); info("Supported: %d, ", n_ptr->bclink.supported); info("Acked: %u\n", n_ptr->bclink.acked); info("Last in: %u, ", n_ptr->bclink.last_in); - info("Gap after: %u, ", n_ptr->bclink.gap_after); - info("Gap to: %u\n", n_ptr->bclink.gap_to); - info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync); + info("Oos state: %u, ", n_ptr->bclink.oos_state); + info("Last sent: %u\n", n_ptr->bclink.last_sent); tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); @@ -1974,7 +1973,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, msg_set_type(msg, msg_typ); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); - msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); + msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); if (msg_typ == STATE_MSG) { @@ -2133,8 +2132,12 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) /* Synchronize broadcast link info, if not done previously */ - if (!tipc_node_is_up(l_ptr->owner)) - l_ptr->owner->bclink.last_in = msg_last_bcast(msg); + if (!tipc_node_is_up(l_ptr->owner)) { + l_ptr->owner->bclink.last_sent = + l_ptr->owner->bclink.last_in = + msg_last_bcast(msg); + l_ptr->owner->bclink.oos_state = 0; + } l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); @@ -2181,7 +2184,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) /* Protocol message before retransmits, reduce loss risk */ - tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); + if (l_ptr->owner->bclink.supported) + tipc_bclink_update_link_state(l_ptr->owner, + msg_last_bcast(msg)); if (rec_gap || (msg_probe(msg))) { tipc_link_send_proto_msg(l_ptr, STATE_MSG, diff --git a/net/tipc/node.c b/net/tipc/node.c index 9196f943b835..6d8bdfd95cd6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -339,12 +339,12 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.supported) { - n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0; while (n_ptr->bclink.deferred_head) { struct sk_buff *buf = n_ptr->bclink.deferred_head; n_ptr->bclink.deferred_head = buf->next; buf_discard(buf); } + n_ptr->bclink.deferred_size = 0; if (n_ptr->bclink.defragm) { buf_discard(n_ptr->bclink.defragm); @@ -450,7 +450,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) read_lock_bh(&tipc_net_lock); - /* Get space for all unicast links + multicast link */ + /* Get space for all unicast links + broadcast link */ payload_size = TLV_SPACE(sizeof(link_info)) * (atomic_read(&tipc_num_links) + 1); diff --git a/net/tipc/node.h b/net/tipc/node.h index 90689f487615..c88ce64f8a31 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -66,9 +66,9 @@ * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node - * @gap_after: sequence # of last message not requiring a NAK request - * @gap_to: sequence # of last message requiring a NAK request - * @nack_sync: counter that determines when NAK requests should be sent + * @last_sent: sequence # of last b'cast message sent by node + * @oos_state: state tracker for handling OOS b'cast messages + * @deferred_size: number of OOS b'cast messages in deferred queue * @deferred_head: oldest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node * @defragm: list of partially reassembled b'cast message fragments from node @@ -91,9 +91,9 @@ struct tipc_node { u8 supported; u32 acked; u32 last_in; - u32 gap_after; - u32 gap_to; - u32 nack_sync; + u32 last_sent; + u32 oos_state; + u32 deferred_size; struct sk_buff *deferred_head; struct sk_buff *deferred_tail; struct sk_buff *defragm; -- cgit v1.2.3 From 1ec2bb08407b377e5954b3f9479c2bf67fc925a9 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 15:03:24 -0400 Subject: tipc: Remove obsolete broadcast tag capability Eliminates support for the broadcast tag field, which is no longer used by broadcast link NACK messages. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 1 - net/tipc/node.c | 7 +------ net/tipc/node.h | 2 -- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 1f3b1607d9d4..a9b7132d34f2 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -325,7 +325,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head ? buf_seqno(n_ptr->bclink.deferred_head) - 1 : n_ptr->bclink.last_sent); - msg_set_bcast_tag(msg, tipc_own_tag); spin_lock_bh(&bc_lock); tipc_bearer_send(&bcbearer->bearer, buf, NULL); diff --git a/net/tipc/node.c b/net/tipc/node.c index 6d8bdfd95cd6..7bc45e135fb4 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -49,9 +49,8 @@ LIST_HEAD(tipc_node_list); static u32 tipc_num_nodes; static atomic_t tipc_num_links = ATOMIC_INIT(0); -u32 tipc_own_tag; -/** +/* * tipc_node_find - locate specified node object, if it exists */ @@ -309,8 +308,6 @@ static void node_established_contact(struct tipc_node *n_ptr) if (n_ptr->bclink.supportable) { tipc_bclink_add_node(n_ptr->addr); n_ptr->bclink.supported = 1; - if (n_ptr->addr < tipc_own_addr) - tipc_own_tag++; } } @@ -353,8 +350,6 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); - if (n_ptr->addr < tipc_own_addr) - tipc_own_tag--; n_ptr->bclink.supported = 0; } diff --git a/net/tipc/node.h b/net/tipc/node.h index c88ce64f8a31..e1b78a2199c2 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -114,8 +114,6 @@ static inline unsigned int tipc_hashfn(u32 addr) return addr & (NODE_HTABLE_SIZE - 1); } -extern u32 tipc_own_tag; - struct tipc_node *tipc_node_find(u32 addr); struct tipc_node *tipc_node_create(u32 addr); void tipc_node_delete(struct tipc_node *n_ptr); -- cgit v1.2.3 From b76b27cad5ade1d483d4b94df6b35976bccf1055 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 16:31:26 -0400 Subject: tipc: Prevent loss of fragmented messages over unicast links Modifies unicast link endpoint logic so an incoming fragmented message is not lost if reassembly cannot begin because there is no buffer big enough to hold the entire reassembled message. The link endpoint now ignores the first fragment completely, which causes the sending node to retransmit the first fragment so that reassembly can be re-attempted. Previously, the sender would have had no reason to retransmit the 1st fragment, so we would never have a chance to re-try the allocation. Signed-off-by: Allan Stephens --- net/tipc/link.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index cce953723ddb..d8b0a22367b6 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1774,6 +1774,7 @@ protocol_check: head = link_insert_deferred_queue(l_ptr, head); if (likely(msg_is_dest(msg, tipc_own_addr))) { + int ret; deliver: if (likely(msg_isdata(msg))) { tipc_node_unlock(n_ptr); @@ -1798,11 +1799,15 @@ deliver: continue; case MSG_FRAGMENTER: l_ptr->stats.recv_fragments++; - if (tipc_link_recv_fragment(&l_ptr->defragm_buf, - &buf, &msg)) { + ret = tipc_link_recv_fragment( + &l_ptr->defragm_buf, + &buf, &msg); + if (ret == 1) { l_ptr->stats.recv_fragmented++; goto deliver; } + if (ret == -1) + l_ptr->next_in_no--; break; case CHANGEOVER_PROTOCOL: type = msg_type(msg); @@ -2632,7 +2637,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, set_fragm_size(pbuf, fragm_sz); set_expected_frags(pbuf, exp_fragm_cnt - 1); } else { - warn("Link unable to reassemble fragmented message\n"); + dbg("Link unable to reassemble fragmented message\n"); + buf_discard(fbuf); + return -1; } buf_discard(fbuf); return 0; -- cgit v1.2.3 From 63e7f1ac2855ba56f15d8189694ca9bd16ae4107 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 16:43:09 -0400 Subject: tipc: Prevent loss of fragmented messages over broadcast link Modifies broadcast link so that an incoming fragmented message is not lost if reassembly cannot begin because there currently is no buffer big enough to hold the entire reassembled message. The broadcast link now ignores the first fragment completely, which causes the sending node to retransmit the first fragment so that reassembly can be re-attempted. Previously, the sender would have had no reason to retransmit the 1st fragment, so we would never have a chance to re-try the allocation. To do this cleanly without duplicaton, a new bclink_accept_pkt() function is introduced. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 64 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a9b7132d34f2..41ecf313073c 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -389,7 +389,33 @@ exit: return res; } -/** +/* + * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet + * + * Called with both sending node's lock and bc_lock taken. + */ + +static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) +{ + bclink_update_last_sent(node, seqno); + node->bclink.last_in = seqno; + node->bclink.oos_state = 0; + bcl->stats.recv_info++; + + /* + * Unicast an ACK periodically, ensuring that + * all nodes in the cluster don't ACK at the same time + */ + + if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { + tipc_link_send_proto_msg( + node->active_links[node->addr & 1], + STATE_MSG, 0, 0, 0, 0, 0); + bcl->stats.sent_acks++; + } +} + +/* * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards * * tipc_net_lock is read_locked, no other locks set @@ -443,29 +469,12 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { - bclink_update_last_sent(node, seqno); receive: - node->bclink.last_in = seqno; - node->bclink.oos_state = 0; - - spin_lock_bh(&bc_lock); - bcl->stats.recv_info++; - - /* - * Unicast an ACK periodically, ensuring that - * all nodes in the cluster don't ACK at the same time - */ - - if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { - tipc_link_send_proto_msg( - node->active_links[node->addr & 1], - STATE_MSG, 0, 0, 0, 0, 0); - bcl->stats.sent_acks++; - } - /* Deliver message to destination */ if (likely(msg_isdata(msg))) { + spin_lock_bh(&bc_lock); + bclink_accept_pkt(node, seqno); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); if (likely(msg_mcast(msg))) @@ -473,24 +482,35 @@ receive: else buf_discard(buf); } else if (msg_user(msg) == MSG_BUNDLER) { + spin_lock_bh(&bc_lock); + bclink_accept_pkt(node, seqno); bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_link_recv_bundle(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { + int ret = tipc_link_recv_fragment(&node->bclink.defragm, + &buf, &msg); + if (ret < 0) + goto unlock; + spin_lock_bh(&bc_lock); + bclink_accept_pkt(node, seqno); bcl->stats.recv_fragments++; - if (tipc_link_recv_fragment(&node->bclink.defragm, - &buf, &msg)) + if (ret > 0) bcl->stats.recv_fragmented++; spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_net_route_msg(buf); } else if (msg_user(msg) == NAME_DISTRIBUTOR) { + spin_lock_bh(&bc_lock); + bclink_accept_pkt(node, seqno); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); tipc_named_recv(buf); } else { + spin_lock_bh(&bc_lock); + bclink_accept_pkt(node, seqno); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); buf_discard(buf); -- cgit v1.2.3 From 3175bd9add570f3b5c06877369897b334556a2ff Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Fri, 28 Oct 2011 12:03:00 -0400 Subject: tipc: Eliminate alteration of publication key during name table purging Removes code that alters the publication key of a name table entry that is being forcibly purged from TIPC's name table after contact with the publishing node has been lost. Current TIPC ensures that all defunct names are purged before re-establishing contact with a failed node. There used to be a risk that the publication might be accidentally deleted because it might be re-added to the name table before the purge operation was completed. But now there is no longer a need to ensure that the new key is different than the old one. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/name_distr.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 98ebb37f1808..acecfda82f37 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -239,9 +239,6 @@ exit: * * Invoked for each publication issued by a newly failed node. * Removes publication structure from name table & deletes it. - * In rare cases the link may have come back up again when this - * function is called, and we have two items representing the same - * publication. Nudge this item's key to distinguish it from the other. */ static void named_purge_publ(struct publication *publ) @@ -249,7 +246,6 @@ static void named_purge_publ(struct publication *publ) struct publication *p; write_lock_bh(&tipc_nametbl_lock); - publ->key += 1222345; p = tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, publ->ref, publ->key); if (p) -- cgit v1.2.3 From dff10e9e637663c8c5dd0bae1a8f0e899cbb4a36 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 2 Nov 2011 10:32:14 -0400 Subject: tipc: Minor optimization to rejection of connection-based messages Modifies message rejection logic so that TIPC doesn't attempt to send a FIN message to the rejecting port if it is known in advance that there is no such message because the rejecting port doesn't exist. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/port.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/net/tipc/port.c b/net/tipc/port.c index d91efc69e6f9..ba3268b8da42 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -400,15 +400,16 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) /* send self-abort message when rejecting on a connected port */ if (msg_connected(msg)) { - struct sk_buff *abuf = NULL; struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); if (p_ptr) { + struct sk_buff *abuf = NULL; + if (p_ptr->connected) abuf = port_build_self_abort_msg(p_ptr, err); tipc_port_unlock(p_ptr); + tipc_net_route_msg(abuf); } - tipc_net_route_msg(abuf); } /* send returned message & dispose of rejected message */ -- cgit v1.2.3