Implemented set to monitor pending interests in manager.
Change-Id: I4a11db79053e69a19ceba0e0501b93911131d1d7
diff --git a/src/main.cpp b/src/main.cpp
index 1deee0b..c4240d9 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -141,8 +141,9 @@
throw ndn::Error("wrong number of arguments for torrent");
}
auto torrentName = args[0];
- auto dataPath = args[1];
- SequentialDataFetcher fetcher(torrentName, dataPath);
+ auto dataPath = args[1];
+ auto seedFlag = (vm.count("seed") != 0);
+ SequentialDataFetcher fetcher(torrentName, dataPath, seedFlag);
fetcher.start();
}
}
diff --git a/src/sequential-data-fetcher.cpp b/src/sequential-data-fetcher.cpp
index 9a5ae5f..472cd71 100644
--- a/src/sequential-data-fetcher.cpp
+++ b/src/sequential-data-fetcher.cpp
@@ -26,12 +26,14 @@
namespace ndn {
namespace ntorrent {
-SequentialDataFetcher::SequentialDataFetcher(const ndn::Name& torrentFileName,
- const std::string& dataPath)
+SequentialDataFetcher::SequentialDataFetcher(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ bool seed)
: m_dataPath(dataPath)
, m_torrentFileName(torrentFileName)
+ , m_seedFlag(seed)
{
- m_manager = make_shared<TorrentManager>(m_torrentFileName, m_dataPath);
+ m_manager = make_shared<TorrentManager>(m_torrentFileName, m_dataPath, seed);
}
SequentialDataFetcher::~SequentialDataFetcher()
@@ -121,6 +123,9 @@
}
else {
LOG_INFO << "All data complete" << std::endl;
+ if (!m_seedFlag) {
+ m_manager->shutdown();
+ }
}
}
}
@@ -136,6 +141,8 @@
void
SequentialDataFetcher::onTorrentFileSegmentReceived(const std::vector<Name>& manifestNames)
{
+ // TODO(msweatt) Add parameter for torrent file
+ LOG_INFO << "Torrent Segment Received: " << m_torrentFileName << std::endl;
this->downloadManifestFiles(manifestNames);
}
diff --git a/src/sequential-data-fetcher.hpp b/src/sequential-data-fetcher.hpp
index 4b050db..ba99c79 100644
--- a/src/sequential-data-fetcher.hpp
+++ b/src/sequential-data-fetcher.hpp
@@ -47,8 +47,9 @@
* @param dataPath The path that the manager would look for already stored data packets and
* will write new data packets
*/
- SequentialDataFetcher(const ndn::Name& torrentFileName,
- const std::string& dataPath);
+ SequentialDataFetcher(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ bool seed = true);
~SequentialDataFetcher();
@@ -99,6 +100,7 @@
std::string m_dataPath;
ndn::Name m_torrentFileName;
shared_ptr<TorrentManager> m_manager;
+ bool m_seedFlag;
};
} // namespace ntorrent
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 8e525e0..179893e 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -381,6 +381,7 @@
auto dataReceived = [path, onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
+ m_pendingInterests.erase(interest.getName());
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
@@ -402,6 +403,9 @@
if (nextSegmentPtr != nullptr) {
this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
}
+ if (!m_seedFlag && m_pendingInterests.empty()) {
+ shutdown();
+ }
};
auto dataFailed = [path, name, onSuccess, onFailed, this]
@@ -418,7 +422,8 @@
}
this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
};
- LOG_TRACE << "Sending: " << *interest << std::endl;
+ m_pendingInterests.insert(interest->getName());
+ LOG_DEBUG << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
}
@@ -471,6 +476,7 @@
auto dataReceived = [onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
+ m_pendingInterests.erase(interest.getName());
// Write data to disk...
if(writeData(data)) {
seed(data);
@@ -479,6 +485,9 @@
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
onSuccess(data.getName());
+ if (!m_seedFlag && m_pendingInterests.empty()) {
+ shutdown();
+ }
};
auto dataFailed = [onFailed, this]
(const Interest& interest) {
@@ -491,7 +500,8 @@
}
onFailed(interest.getName(), "Unknown failure");
};
- LOG_TRACE << "Sending: " << *interest << std::endl;
+ m_pendingInterests.insert(interest->getName());
+ LOG_DEBUG << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
}
@@ -605,6 +615,7 @@
auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
+ m_pendingInterests.erase(interest.getName());
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
@@ -628,6 +639,9 @@
else {
onSuccess(*packetNames);
}
+ if (!m_seedFlag && m_pendingInterests.empty()) {
+ shutdown();
+ }
};
auto dataFailed = [packetNames, path, manifestName, onFailed, this]
@@ -640,7 +654,8 @@
}
onFailed(interest.getName(), "Unknown failure");
};
- LOG_TRACE << "Sending: " << *interest << std::endl;
+ m_pendingInterests.insert(interest->getName());
+ LOG_DEBUG << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
}
@@ -648,7 +663,7 @@
TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
// handle if it is a torrent-file
- LOG_TRACE << "Interest Recevied: " << interest << std::endl;
+ LOG_DEBUG << "Interest Recevied: " << interest << std::endl;
const auto& interestName = interest.getName();
std::shared_ptr<Data> data = nullptr;
auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
@@ -712,7 +727,7 @@
LOG_ERROR << "ERROR: Failed to register prefix \""
<< prefix << "\" in local hub's daemon (" << reason << ")"
<< std::endl;
- m_face->shutdown();
+ shutdown();
}
shared_ptr<Interest>
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index a3197d1..20fdb02 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -38,6 +38,7 @@
#include <memory>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
namespace fs = boost::filesystem;
@@ -68,6 +69,7 @@
*/
TorrentManager(const ndn::Name& torrentFileName,
const std::string& dataPath,
+ bool seed = true,
std::shared_ptr<Face> face = nullptr);
/*
@@ -147,6 +149,11 @@
findAllMissingDataPackets(std::vector<Name>& packetNames) const;
/*
+ * @brief Stop all network activities of this manager
+ */
+ void
+ shutdown();
+ /*
* @brief Download the torrent file
* @param path The path to write the downloaded segments
* @param onSuccess Callback to be called if we successfully download all the
@@ -313,10 +320,12 @@
shared_ptr<Interest>
createInterest(Name name);
- // Stats table where routable prefixes are stored
- StatsTable m_statsTable;
+ // A flag to determine if upon completion we should continue seeding
+ bool m_seedFlag;
// Face used for network communication
std::shared_ptr<Face> m_face;
+ // Stats table where routable prefixes are stored
+ StatsTable m_statsTable;
// Iterator to the routable prefix that we currently use
StatsTable::iterator m_stats_table_iter;
// Number of retries per routable prefix
@@ -325,6 +334,8 @@
uint64_t m_sortingCounter;
// Keychain instance
shared_ptr<KeyChain> m_keyChain;
+
+ std::unordered_set<ndn::Name> m_pendingInterests;
// TODO(spyros) Fix and reintegrate update handler
// // Update Handler instance
// shared_ptr<UpdateHandler> m_updateHandler;
@@ -333,12 +344,14 @@
inline
TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
const std::string& dataPath,
+ bool seed,
std::shared_ptr<Face> face)
: m_fileStates()
, m_torrentSegments()
, m_fileManifests()
, m_torrentFileName(torrentFileName)
, m_dataPath(dataPath)
+, m_seedFlag(seed)
, m_face(face)
, m_retries(0)
, m_sortingCounter(0)
@@ -369,6 +382,14 @@
return findTorrentFileSegmentToDownload() == nullptr;
}
+inline
+void
+TorrentManager::shutdown()
+{
+ // TODO(msweatt) Consider unregistering all prefix to exit more gracefully
+ m_face->shutdown();
+}
+
} // end ntorrent
} // end ndn
diff --git a/src/util/logging.hpp b/src/util/logging.hpp
index bb926a4..e07626d 100644
--- a/src/util/logging.hpp
+++ b/src/util/logging.hpp
@@ -28,7 +28,7 @@
#include <boost/log/sources/severity_logger.hpp>
#include <boost/log/trivial.hpp>
-enum { SEVERITY_THRESHOLD = boost::log::trivial::warning };
+enum { SEVERITY_THRESHOLD = boost::log::trivial::trace };
// register a global logger
BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>)