blob: fb95099eea6bc463e1221d8aa6356470f152ad91 [file] [log] [blame]
Eric Newberry185ab292017-03-28 06:45:39 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014-2017, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#include "lp-reliability.hpp"
27#include "generic-link-service.hpp"
28#include "transport.hpp"
29
30namespace nfd {
31namespace face {
32
33LpReliability::LpReliability(const LpReliability::Options& options, GenericLinkService* linkService)
34 : m_options(options)
35 , m_linkService(linkService)
36 , m_firstUnackedFrag(m_unackedFrags.begin())
37 , m_isIdleAckTimerRunning(false)
38{
39 BOOST_ASSERT(m_linkService != nullptr);
40
41 BOOST_ASSERT(m_options.idleAckTimerPeriod > time::nanoseconds::zero());
42}
43
44void
45LpReliability::setOptions(const Options& options)
46{
47 BOOST_ASSERT(options.idleAckTimerPeriod > time::nanoseconds::zero());
48
49 if (m_options.isEnabled && !options.isEnabled) {
50 this->stopIdleAckTimer();
51 }
52
53 m_options = options;
54}
55
56const GenericLinkService*
57LpReliability::getLinkService() const
58{
59 return m_linkService;
60}
61
62void
63LpReliability::observeOutgoing(const std::vector<lp::Packet>& frags)
64{
65 BOOST_ASSERT(m_options.isEnabled);
66
67 // The sequence number of the first fragment is used to identify the NetPkt.
68 lp::Sequence netPktIdentifier = frags.at(0).get<lp::SequenceField>();
69 auto& netPkt = m_netPkts.emplace(netPktIdentifier, NetPkt{}).first->second;
70 auto unackedFragsIt = m_unackedFrags.begin();
71 auto netPktUnackedFragsIt = netPkt.unackedFrags.begin();
72
73 for (const lp::Packet& frag : frags) {
74 // Store LpPacket for future retransmissions
75 lp::Sequence seq = frag.get<lp::SequenceField>();
76 unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt, seq, frag);
77 unackedFragsIt->second.rtoTimer =
78 scheduler::schedule(m_rto.computeRto(), bind(&LpReliability::onLpPacketLost, this, seq));
79 unackedFragsIt->second.sendTime = time::steady_clock::now();
80 netPktUnackedFragsIt = netPkt.unackedFrags.insert(netPktUnackedFragsIt, seq);
81 if (m_unackedFrags.size() == 1) {
82 m_firstUnackedFrag = unackedFragsIt;
83 }
84 }
85}
86
87void
88LpReliability::processIncomingPacket(const lp::Packet& pkt)
89{
90 BOOST_ASSERT(m_options.isEnabled);
91
92 auto now = time::steady_clock::now();
93
94 // Extract and parse Acks
95 for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
96 auto txFrag = m_unackedFrags.find(ackSeq);
97 if (txFrag == m_unackedFrags.end()) {
98 // Ignore an Ack for an unknown sequence number
99 continue;
100 }
101
102 // Cancel the RTO timer for the acknowledged fragment
103 txFrag->second.rtoTimer.cancel();
104
105 if (txFrag->second.retxCount == 0) {
106 // This sequence had no retransmissions, so use it to calculate the RTO
107 m_rto.addMeasurement(time::duration_cast<RttEstimator::Duration>(now - txFrag->second.sendTime));
108 }
109
110 // Look for Acks with sequence numbers < ackSeq (allowing for wraparound) and consider them lost
111 // if a configurable number of Acks containing greater sequence numbers have been received.
112 auto lostLpPackets = findLostLpPackets(ackSeq);
113
114 // Remove the fragment from the map of unacknowledged sequences and from its associated network
115 // packet (removing the network packet if it has been received in whole by remote host).
116 // Potentially increment the start of the window.
117 onLpPacketAcknowledged(txFrag, getNetPktByFrag(ackSeq));
118
119 // Resend or fail fragments considered lost. This must be done separately from the above
120 // enhanced for loop because onLpPacketLost may delete the fragment from m_unackedFrags.
121 for (const lp::Sequence& seq : lostLpPackets) {
122 this->onLpPacketLost(seq);
123 }
124 }
125
126 // If has Fragment field, extract Sequence and add to AckQueue
127 if (pkt.has<lp::FragmentField>() && pkt.has<lp::SequenceField>()) {
128 m_ackQueue.push(pkt.get<lp::SequenceField>());
129 if (!m_isIdleAckTimerRunning) {
130 this->startIdleAckTimer();
131 }
132 }
133}
134
135void
136LpReliability::piggyback(lp::Packet& pkt, ssize_t mtu)
137{
138 BOOST_ASSERT(m_options.isEnabled);
139
140 int maxAcks = std::numeric_limits<int>::max();
141 if (mtu > 0) {
142 // Ack Type (3 octets) + Ack Length (1 octet) + sizeof(lp::Sequence)
143 size_t ackSize = 3 + 1 + sizeof(lp::Sequence);
144 ndn::EncodingEstimator estimator;
145 maxAcks = (mtu - pkt.wireEncode(estimator)) / ackSize;
146 }
147
148 ssize_t nAcksInPkt = 0;
149 while (!m_ackQueue.empty() && nAcksInPkt < maxAcks) {
150 pkt.add<lp::AckField>(m_ackQueue.front());
151 m_ackQueue.pop();
152 nAcksInPkt++;
153 }
154}
155
156void
157LpReliability::startIdleAckTimer()
158{
159 BOOST_ASSERT(!m_isIdleAckTimerRunning);
160 m_isIdleAckTimerRunning = true;
161
162 m_idleAckTimer = scheduler::schedule(m_options.idleAckTimerPeriod, [this] {
163 while (!m_ackQueue.empty()) {
164 m_linkService->requestIdlePacket();
165 }
166
167 m_isIdleAckTimerRunning = false;
168 });
169}
170
171void
172LpReliability::stopIdleAckTimer()
173{
174 m_idleAckTimer.cancel();
175 m_isIdleAckTimerRunning = false;
176}
177
178std::vector<lp::Sequence>
179LpReliability::findLostLpPackets(lp::Sequence ackSeq)
180{
181 std::vector<lp::Sequence> lostLpPackets;
182
183 for (auto it = m_firstUnackedFrag; ; ++it) {
184 if (it == m_unackedFrags.end()) {
185 it = m_unackedFrags.begin();
186 }
187
188 if (it->first == ackSeq) {
189 break;
190 }
191
192 auto& unackedFrag = it->second;
193
194 unackedFrag.nGreaterSeqAcks++;
195
196 if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold && !unackedFrag.wasTimedOutBySeq) {
197 unackedFrag.wasTimedOutBySeq = true;
198 lostLpPackets.push_back(it->first);
199 }
200 }
201
202 return lostLpPackets;
203}
204
205void
206LpReliability::onLpPacketLost(lp::Sequence seq)
207{
208 auto& txFrag = m_unackedFrags.at(seq);
209 auto netPktIt = getNetPktByFrag(seq);
210
211 // Check if maximum number of retransmissions exceeded
212 if (txFrag.retxCount >= m_options.maxRetx) {
213 // Delete all LpPackets of NetPkt from TransmitCache
214 lp::Sequence firstSeq = *(netPktIt->second.unackedFrags.begin());
215 lp::Sequence lastSeq = *(std::prev(netPktIt->second.unackedFrags.end()));
216 if (lastSeq >= firstSeq) { // Normal case: no wraparound
217 m_unackedFrags.erase(m_unackedFrags.find(firstSeq), std::next(m_unackedFrags.find(lastSeq)));
218 }
219 else { // sequence number wraparound
220 m_unackedFrags.erase(m_unackedFrags.find(firstSeq), m_unackedFrags.end());
221 m_unackedFrags.erase(m_unackedFrags.begin(), std::next(m_unackedFrags.find(lastSeq)));
222 }
223
224 m_netPkts.erase(netPktIt);
225
226 ++m_linkService->nRetxExhausted;
227 }
228 else {
229 txFrag.retxCount++;
230
231 // Start RTO timer for this sequence
232 txFrag.rtoTimer = scheduler::schedule(m_rto.computeRto(),
233 bind(&LpReliability::onLpPacketLost, this, seq));
234
235 // Retransmit fragment
236 m_linkService->sendLpPacket(lp::Packet(txFrag.pkt));
237 }
238}
239
240void
241LpReliability::onLpPacketAcknowledged(std::map<lp::Sequence, LpReliability::UnackedFrag>::iterator fragIt,
242 std::map<lp::Sequence, LpReliability::NetPkt>::iterator netPktIt)
243{
244 lp::Sequence seq = fragIt->first;
245 // We need to store the sequence of the window begin in case we are erasing it from m_unackedFrags
246 lp::Sequence firstUnackedSeq = m_firstUnackedFrag->first;
247 auto nextSeqIt = m_unackedFrags.erase(fragIt);
248 netPktIt->second.unackedFrags.erase(seq);
249
250 if (!m_unackedFrags.empty() && firstUnackedSeq == seq) {
251 // If "first" fragment in send window (allowing for wraparound), increment window begin
252 if (nextSeqIt == m_unackedFrags.end()) {
253 m_firstUnackedFrag = m_unackedFrags.begin();
254 }
255 else {
256 m_firstUnackedFrag = nextSeqIt;
257 }
258 }
259
260 // Check if network-layer packet completely received. If so, delete network packet mapping
261 // and increment counter
262 if (netPktIt->second.unackedFrags.empty()) {
263 if (netPktIt->second.didRetx) {
264 ++m_linkService->nRetransmitted;
265 }
266 else {
267 ++m_linkService->nAcknowledged;
268 }
269 m_netPkts.erase(netPktIt);
270 }
271}
272
273std::map<lp::Sequence, LpReliability::NetPkt>::iterator
274LpReliability::getNetPktByFrag(lp::Sequence seq)
275{
276 BOOST_ASSERT(!m_netPkts.empty());
277 auto it = m_netPkts.lower_bound(seq);
278 if (it == m_netPkts.end()) {
279 // This can happen because of sequence number wraparound in the middle of a network packet.
280 // In this case, the network packet will be at the end of m_netPkts and we will need to
281 // decrement the iterator to m_netPkts.end() to the one before it.
282 --it;
283 }
284 return it;
285}
286
287LpReliability::UnackedFrag::UnackedFrag(lp::Packet pkt)
288 : pkt(std::move(pkt))
289 , sendTime(time::steady_clock::now())
290 , retxCount(0)
291 , nGreaterSeqAcks(0)
292 , wasTimedOutBySeq(false)
293{
294}
295
296LpReliability::NetPkt::NetPkt()
297 : didRetx(false)
298{
299}
300
301std::ostream&
302operator<<(std::ostream& os, const FaceLogHelper<LpReliability>& flh)
303{
304 os << FaceLogHelper<LinkService>(*flh.obj.getLinkService());
305 return os;
306}
307
308} // namespace face
309} // namespace nfd