Adding implementation of Interest Queue
Change-Id: I4a11db79053e69a19ceba0e0501b93911131d3a2
Refs: #3612
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 179893e..110d90f 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -403,13 +403,16 @@
if (nextSegmentPtr != nullptr) {
this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
}
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
+ return;
}
};
auto dataFailed = [path, name, onSuccess, onFailed, this]
(const Interest& interest) {
+ m_pendingInterests.erase(interest.getName());
++m_retries;
if (m_retries >= MAX_NUM_OF_RETRIES) {
++m_stats_table_iter;
@@ -417,14 +420,15 @@
m_stats_table_iter = m_statsTable.begin();
}
}
+ this->sendInterest();
if (onFailed) {
onFailed(interest.getName(), "Unknown error");
}
this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void
@@ -485,13 +489,16 @@
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
onSuccess(data.getName());
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
}
};
+
auto dataFailed = [onFailed, this]
(const Interest& interest) {
m_retries++;
+ m_pendingInterests.erase(interest.getName());
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
if (m_stats_table_iter == m_statsTable.end()) {
@@ -499,10 +506,11 @@
}
}
onFailed(interest.getName(), "Unknown failure");
+ this->sendInterest();
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void TorrentManager::seed(const Data& data) {
@@ -639,13 +647,16 @@
else {
onSuccess(*packetNames);
}
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
+ return;
}
};
auto dataFailed = [packetNames, path, manifestName, onFailed, this]
(const Interest& interest) {
+ m_pendingInterests.erase(interest.getName());
m_retries++;
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
@@ -653,17 +664,18 @@
m_stats_table_iter = m_statsTable.begin();
}
onFailed(interest.getName(), "Unknown failure");
+ this->sendInterest();
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void
TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
// handle if it is a torrent-file
- LOG_DEBUG << "Interest Recevied: " << interest << std::endl;
+ LOG_DEBUG << "Interest Received: " << interest << std::endl;
const auto& interestName = interest.getName();
std::shared_ptr<Data> data = nullptr;
auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
@@ -764,5 +776,19 @@
return interest;
}
+void
+TorrentManager::sendInterest()
+{
+ auto nackCallBack = [](const Interest& i, const lp::Nack& n) {
+ LOG_ERROR << "Nack received: " << n.getReason() << ": " << i << std::endl;
+ };
+ while (m_pendingInterests.size() < WINDOW_SIZE && !m_interestQueue->empty()) {
+ queueTuple tup = m_interestQueue->pop();
+ m_pendingInterests.insert(std::get<0>(tup)->getName());
+ LOG_DEBUG << "Sending: " << *(std::get<0>(tup)) << std::endl;
+ m_face->expressInterest(*std::get<0>(tup), std::get<1>(tup), nackCallBack, std::get<2>(tup));
+ }
+}
+
} // end ntorrent
} // end ndn