Bug: there was a subtle bug in fetcher (in some cases, same segment were rerequested)
diff --git a/src/fetcher.cc b/src/fetcher.cc
index a67f76a..ea750e4 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -99,6 +99,11 @@
if (m_outOfOrderRecvSeqNo.find (m_minSendSeqNo+1) != m_outOfOrderRecvSeqNo.end ())
continue;
+ if (m_inActivePipeline.find (m_minSendSeqNo+1) != m_inActivePipeline.end ())
+ continue;
+
+ m_inActivePipeline.insert (m_minSendSeqNo+1);
+
_LOG_DEBUG (" >>> i " << Name (m_forwardingHint)(m_name) << ", seq = " << (m_minSendSeqNo + 1 ));
// cout << ">>> " << m_minSendSeqNo+1 << endl;
@@ -148,26 +153,37 @@
////////////////////////////////////////////////////////////////////////////
m_outOfOrderRecvSeqNo.insert (seqno);
+ m_inActivePipeline.erase (seqno);
+ _LOG_DEBUG ("Total segments received: " << m_outOfOrderRecvSeqNo.size ());
set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end ();
inOrderSeqNo++)
{
+ _LOG_TRACE ("Checking " << *inOrderSeqNo << " and " << m_maxInOrderRecvSeqNo+1);
if (*inOrderSeqNo == m_maxInOrderRecvSeqNo+1)
{
m_maxInOrderRecvSeqNo = *inOrderSeqNo;
}
+ else if (*inOrderSeqNo < m_maxInOrderRecvSeqNo+1) // not possible anymore, but just in case
+ {
+ continue;
+ }
else
break;
}
m_outOfOrderRecvSeqNo.erase (m_outOfOrderRecvSeqNo.begin (), inOrderSeqNo);
////////////////////////////////////////////////////////////////////////////
+ _LOG_TRACE ("Max in order received: " << m_maxInOrderRecvSeqNo << ", max seqNo to request: " << m_maxSeqNo);
+
if (m_maxInOrderRecvSeqNo == m_maxSeqNo)
{
+ _LOG_TRACE ("Fetch finished");
m_active = false;
// invoke callback
if (!m_finishCallback.empty ())
{
+ _LOG_TRACE ("Notifying callback");
m_finishCallback(m_deviceName, m_name);
}
@@ -194,10 +210,13 @@
if (m_lastPositiveActivity <
(date_time::second_clock<boost::posix_time::ptime>::universal_time() - m_maximumNoActivityPeriod))
{
+ m_inActivePipeline.erase (seqno);
m_activePipeline --;
if (m_activePipeline == 0)
{
_LOG_DEBUG ("Telling that fetch failed");
+ _LOG_DEBUG ("Active pipeline size should be zero: " << m_inActivePipeline.size ());
+
m_active = false;
m_onFetchFailed (*this);
// this is not valid anymore, but we still should be able finish work
@@ -206,7 +225,7 @@
}
else
{
- _LOG_DEBUG ("Asking to reexpress");
+ _LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
return Closure::RESULT_REEXPRESS;
}
}
diff --git a/src/fetcher.h b/src/fetcher.h
index 6a31b5e..0786905 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -112,6 +112,7 @@
int64_t m_minSendSeqNo;
int64_t m_maxInOrderRecvSeqNo;
std::set<int64_t> m_outOfOrderRecvSeqNo;
+ std::set<int64_t> m_inActivePipeline;
int64_t m_minSeqNo;
int64_t m_maxSeqNo;