Merge remote-tracking branch 'git.irl/alex'
Conflicts:
src/fetcher.cc
diff --git a/cmd/dump-db.cc b/cmd/dump-db.cc
index 5be8719..568c3db 100644
--- a/cmd/dump-db.cc
+++ b/cmd/dump-db.cc
@@ -45,7 +45,7 @@
sqlite3_prepare_v2 (m_db,
"SELECT device_name, seq_no, action, filename, version, file_hash, file_seg_num, parent_device_name, parent_seq_no "
" FROM ActionLog "
- " ORDER BY filename,version", -1, &stmt, 0);
+ /*" ORDER BY filename,version"*/, -1, &stmt, 0);
cout.setf(std::ios::left, std::ios::adjustfield);
cout << ">> ACTION LOG <<" << endl;
diff --git a/log4cxx.properties b/log4cxx.properties
index a3663c4..d5e1569 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -1,5 +1,5 @@
# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=TRACE, A1
+log4j.rootLogger=TRACE, A1, rollingFile
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
@@ -11,10 +11,20 @@
#log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS} %-5p %-12c %m%n
-log4j.logger.Executor = ERROR
-log4j.logger.Sync.Log = ERROR
-log4j.logger.Sync.Core = DEBUG
-log4j.logger.Scheduler = ERROR
+log4j.appender.rollingFile=org.apache.log4j.RollingFileAppender
+log4j.appender.rollingFile.File=logfile.txt
+log4j.appender.rollingFile.MaxFileSize=10MB
+log4j.appender.rollingFile.MaxBackupIndex=9
+log4j.appender.rollingFile.layout = org.apache.log4j.PatternLayout
+log4j.appender.rollingFile.layout.ConversionPattern=%d{ss,SSS} %-5p %-12c %m%n
+
+#log4j.logger.Executor = ERROR
+#log4j.logger.Sync.Log = ERROR
+#log4j.logger.Sync.Core = TRACE
+#log4j.logger.Scheduler = ERROR
+#
+#log4j.logger.Fetcher = TRACE
+#log4j.logger.FetchManager = TRACE
#log4j.logger.Sync = DEBUG
#log4j.logger.Sync.Log = ERROR
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 2192fd7..b7f6e35 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -99,6 +99,11 @@
if (m_outOfOrderRecvSeqNo.find (m_minSendSeqNo+1) != m_outOfOrderRecvSeqNo.end ())
continue;
+ if (m_inActivePipeline.find (m_minSendSeqNo+1) != m_inActivePipeline.end ())
+ continue;
+
+ m_inActivePipeline.insert (m_minSendSeqNo+1);
+
_LOG_DEBUG (" >>> i " << Name (m_forwardingHint)(m_name) << ", seq = " << (m_minSendSeqNo + 1 ));
// cout << ">>> " << m_minSendSeqNo+1 << endl;
@@ -148,26 +153,37 @@
////////////////////////////////////////////////////////////////////////////
m_outOfOrderRecvSeqNo.insert (seqno);
+ m_inActivePipeline.erase (seqno);
+ _LOG_DEBUG ("Total segments received: " << m_outOfOrderRecvSeqNo.size ());
set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end ();
inOrderSeqNo++)
{
+ _LOG_TRACE ("Checking " << *inOrderSeqNo << " and " << m_maxInOrderRecvSeqNo+1);
if (*inOrderSeqNo == m_maxInOrderRecvSeqNo+1)
{
m_maxInOrderRecvSeqNo = *inOrderSeqNo;
}
+ else if (*inOrderSeqNo < m_maxInOrderRecvSeqNo+1) // not possible anymore, but just in case
+ {
+ continue;
+ }
else
break;
}
m_outOfOrderRecvSeqNo.erase (m_outOfOrderRecvSeqNo.begin (), inOrderSeqNo);
////////////////////////////////////////////////////////////////////////////
+ _LOG_TRACE ("Max in order received: " << m_maxInOrderRecvSeqNo << ", max seqNo to request: " << m_maxSeqNo);
+
if (m_maxInOrderRecvSeqNo == m_maxSeqNo)
{
+ _LOG_TRACE ("Fetch finished");
m_active = false;
// invoke callback
if (!m_finishCallback.empty ())
{
+ _LOG_TRACE ("Notifying callback");
m_finishCallback(m_deviceName, m_name);
}
@@ -194,10 +210,13 @@
if (m_lastPositiveActivity <
(date_time::second_clock<boost::posix_time::ptime>::universal_time() - m_maximumNoActivityPeriod))
{
+ m_inActivePipeline.erase (seqno);
m_activePipeline --;
if (m_activePipeline == 0)
{
_LOG_DEBUG ("Telling that fetch failed");
+ _LOG_DEBUG ("Active pipeline size should be zero: " << m_inActivePipeline.size ());
+
m_active = false;
m_onFetchFailed (*this);
// this is not valid anymore, but we still should be able finish work
@@ -205,7 +224,7 @@
}
else
{
- _LOG_DEBUG ("Asking to reexpress");
+ _LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
m_ccnx->sendInterest (name, closure, selectors);
}
}
diff --git a/src/fetcher.h b/src/fetcher.h
index 7ab08c9..53d8562 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -112,6 +112,7 @@
int64_t m_minSendSeqNo;
int64_t m_maxInOrderRecvSeqNo;
std::set<int64_t> m_outOfOrderRecvSeqNo;
+ std::set<int64_t> m_inActivePipeline;
int64_t m_minSeqNo;
int64_t m_maxSeqNo;