Adding implementation of Interest Queue
Change-Id: I4a11db79053e69a19ceba0e0501b93911131d3a2
Refs: #3612
diff --git a/src/interest-queue.cpp b/src/interest-queue.cpp
new file mode 100644
index 0000000..062e222
--- /dev/null
+++ b/src/interest-queue.cpp
@@ -0,0 +1,43 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+* Copyright (c) 2016 Regents of the University of California.
+*
+* This file is part of the nTorrent codebase.
+*
+* nTorrent is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Lesser General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or (at your option) any later version.
+*
+* nTorrent is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+*
+* You should have received copies of the GNU General Public License and GNU Lesser
+* General Public License along with nTorrent, e.g., in COPYING.md file. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* See AUTHORS for complete list of nTorrent authors and contributors.
+*/
+
+#include "interest-queue.hpp"
+
+namespace ndn {
+namespace ntorrent {
+
+void
+InterestQueue::push(shared_ptr<Interest> interest, DataCallback dataReceivedCallback,
+ TimeoutCallback dataFailedCallback)
+{
+ m_queue.push(std::make_tuple(interest, dataReceivedCallback, dataFailedCallback));
+}
+
+queueTuple
+InterestQueue::pop()
+{
+ queueTuple tup = m_queue.front();
+ m_queue.pop();
+ return tup;
+}
+
+} // namespace ntorrent
+} // namespace ndn
diff --git a/src/interest-queue.hpp b/src/interest-queue.hpp
new file mode 100644
index 0000000..6600297
--- /dev/null
+++ b/src/interest-queue.hpp
@@ -0,0 +1,106 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+* Copyright (c) 2016 Regents of the University of California.
+*
+* This file is part of the nTorrent codebase.
+*
+* nTorrent is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Lesser General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or (at your option) any later version.
+*
+* nTorrent is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+*
+* You should have received copies of the GNU General Public License and GNU Lesser
+* General Public License along with nTorrent, e.g., in COPYING.md file. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* See AUTHORS for complete list of nTorrent authors and contributors.
+*/
+
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+
+#include <queue>
+#include <tuple>
+
+typedef std::tuple<std::shared_ptr<ndn::Interest>, ndn::DataCallback, ndn::TimeoutCallback> queueTuple;
+
+namespace ndn {
+namespace ntorrent {
+
+class InterestQueue
+{
+public:
+ InterestQueue() = default;
+
+ ~InterestQueue() = default;
+
+ /**
+ * @brief Push a tuple to the Interest Queue
+ * @param interest A shared pointer to an Interest
+ * @param dataReceivedCallback Callback to be called when data is received for the given
+ * Interest
+ * @param dataFailedCallback Callback to be called when we fail to retrieve data for the
+ * given Interest
+ *
+ */
+ void
+ push(shared_ptr<Interest> interest, DataCallback dataReceivedCallback,
+ TimeoutCallback dataFailedCallback);
+
+ /**
+ * @brief Pop a tuple from the Interest Queue
+ * @return A tuple of a shared pointer to an Interest, a callaback for successful data
+ * retrieval and a callback for failed data retrieval
+ */
+ queueTuple
+ pop();
+
+ /**
+ * @brief Return the size of the queue (number of tuples)
+ * @return The number of tuples stored in the queue
+ */
+ size_t
+ size() const;
+
+ /**
+ * @brief Check if the queue is empty
+ * @return True if the queue is empty, otherwise false
+ */
+ bool
+ empty() const;
+
+ /**
+ * @brief Return the top element of the Interest queue
+ * @return The top tuple element of the Interest queue
+ */
+ queueTuple
+ front() const;
+
+private:
+ std::queue<queueTuple> m_queue;
+};
+
+inline size_t
+InterestQueue::size() const
+{
+ return m_queue.size();
+}
+
+inline bool
+InterestQueue::empty() const
+{
+ return m_queue.empty();
+}
+
+inline queueTuple
+InterestQueue::front() const
+{
+ return m_queue.front();
+}
+
+} // namespace ntorrent
+} // namespace ndn
diff --git a/src/sequential-data-fetcher.cpp b/src/sequential-data-fetcher.cpp
index 472cd71..758f6f7 100644
--- a/src/sequential-data-fetcher.cpp
+++ b/src/sequential-data-fetcher.cpp
@@ -170,14 +170,6 @@
this->downloadManifestFiles({ interest.getName() });
}
else if (nameType == IoUtil::DATA_PACKET) {
- LOG_ERROR << "Torrent File Segment Downloading Failed: " << interest.getName();
- this->downloadTorrentFile();
- }
- else if (nameType == IoUtil::FILE_MANIFEST) {
- LOG_ERROR << "Manifest File Segment Downloading Failed: " << interest.getName();
- this->downloadManifestFiles({ interest.getName() });
- }
- else if (nameType == IoUtil::DATA_PACKET) {
LOG_ERROR << "Data Packet Downloading Failed: " << interest.getName();
this->downloadPackets({ interest.getName() });
}
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 179893e..110d90f 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -403,13 +403,16 @@
if (nextSegmentPtr != nullptr) {
this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
}
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
+ return;
}
};
auto dataFailed = [path, name, onSuccess, onFailed, this]
(const Interest& interest) {
+ m_pendingInterests.erase(interest.getName());
++m_retries;
if (m_retries >= MAX_NUM_OF_RETRIES) {
++m_stats_table_iter;
@@ -417,14 +420,15 @@
m_stats_table_iter = m_statsTable.begin();
}
}
+ this->sendInterest();
if (onFailed) {
onFailed(interest.getName(), "Unknown error");
}
this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void
@@ -485,13 +489,16 @@
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
onSuccess(data.getName());
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
}
};
+
auto dataFailed = [onFailed, this]
(const Interest& interest) {
m_retries++;
+ m_pendingInterests.erase(interest.getName());
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
if (m_stats_table_iter == m_statsTable.end()) {
@@ -499,10 +506,11 @@
}
}
onFailed(interest.getName(), "Unknown failure");
+ this->sendInterest();
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void TorrentManager::seed(const Data& data) {
@@ -639,13 +647,16 @@
else {
onSuccess(*packetNames);
}
- if (!m_seedFlag && m_pendingInterests.empty()) {
+ this->sendInterest();
+ if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
shutdown();
+ return;
}
};
auto dataFailed = [packetNames, path, manifestName, onFailed, this]
(const Interest& interest) {
+ m_pendingInterests.erase(interest.getName());
m_retries++;
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
@@ -653,17 +664,18 @@
m_stats_table_iter = m_statsTable.begin();
}
onFailed(interest.getName(), "Unknown failure");
+ this->sendInterest();
};
- m_pendingInterests.insert(interest->getName());
- LOG_DEBUG << "Sending: " << *interest << std::endl;
- m_face->expressInterest(*interest, dataReceived, dataFailed);
+ LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+ m_interestQueue->push(interest, dataReceived, dataFailed);
+ this->sendInterest();
}
void
TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
// handle if it is a torrent-file
- LOG_DEBUG << "Interest Recevied: " << interest << std::endl;
+ LOG_DEBUG << "Interest Received: " << interest << std::endl;
const auto& interestName = interest.getName();
std::shared_ptr<Data> data = nullptr;
auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
@@ -764,5 +776,19 @@
return interest;
}
+void
+TorrentManager::sendInterest()
+{
+ auto nackCallBack = [](const Interest& i, const lp::Nack& n) {
+ LOG_ERROR << "Nack received: " << n.getReason() << ": " << i << std::endl;
+ };
+ while (m_pendingInterests.size() < WINDOW_SIZE && !m_interestQueue->empty()) {
+ queueTuple tup = m_interestQueue->pop();
+ m_pendingInterests.insert(std::get<0>(tup)->getName());
+ LOG_DEBUG << "Sending: " << *(std::get<0>(tup)) << std::endl;
+ m_face->expressInterest(*std::get<0>(tup), std::get<1>(tup), nackCallBack, std::get<2>(tup));
+ }
+}
+
} // end ntorrent
} // end ndn
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index 20fdb02..12096c3 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -22,6 +22,7 @@
#define INCLUDED_TORRENT_FILE_MANAGER_H
#include "file-manifest.hpp"
+#include "interest-queue.hpp"
#include "torrent-file.hpp"
#include "update-handler.hpp"
@@ -288,7 +289,9 @@
// Number of times to retry if a routable prefix fails to retrieve data
MAX_NUM_OF_RETRIES = 5,
// Number of Interests to be sent before sorting the stats table
- SORTING_INTERVAL = 100
+ SORTING_INTERVAL = 100,
+ // Maximum window size used for sending new Interests out
+ WINDOW_SIZE = 50
};
void onDataReceived(const Data& data);
@@ -320,6 +323,10 @@
shared_ptr<Interest>
createInterest(Name name);
+ void
+ sendInterest();
+
+
// A flag to determine if upon completion we should continue seeding
bool m_seedFlag;
// Face used for network communication
@@ -336,6 +343,8 @@
shared_ptr<KeyChain> m_keyChain;
std::unordered_set<ndn::Name> m_pendingInterests;
+
+ shared_ptr<InterestQueue> m_interestQueue;
// TODO(spyros) Fix and reintegrate update handler
// // Update Handler instance
// shared_ptr<UpdateHandler> m_updateHandler;
@@ -357,6 +366,8 @@
, m_sortingCounter(0)
, m_keyChain(new KeyChain())
{
+ m_interestQueue = make_shared<InterestQueue>();
+
if(face == nullptr) {
m_face = make_shared<Face>();
}
diff --git a/src/util/logging.hpp b/src/util/logging.hpp
index e07626d..d10e926 100644
--- a/src/util/logging.hpp
+++ b/src/util/logging.hpp
@@ -28,7 +28,7 @@
#include <boost/log/sources/severity_logger.hpp>
#include <boost/log/trivial.hpp>
-enum { SEVERITY_THRESHOLD = boost::log::trivial::trace };
+enum { SEVERITY_THRESHOLD = boost::log::trivial::debug };
// register a global logger
BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>)