More bug fixes to complete basic operations of the application. Additionally implemented simple dump
option in application.
Change-Id: I3a47580af916482b0d901456455a9395d89048c7
diff --git a/src/fetching-strategy-manager.hpp b/src/fetching-strategy-manager.hpp
index a34afa0..b81faf2 100644
--- a/src/fetching-strategy-manager.hpp
+++ b/src/fetching-strategy-manager.hpp
@@ -46,7 +46,7 @@
* @brief Method called to start the torrent downloading
*/
virtual void
- start() = 0;
+ start(const time::milliseconds& timeout = time::milliseconds::zero()) = 0;
/**
* @brief Method called to pause the torrent downloading
@@ -67,13 +67,6 @@
struct status {
double downloadedPercent;
};
- /**
- * @brief Seed downloaded data for the specified timeout.
- * By default this will go into an infinite loop.
- */
- virtual void
- seed(const time::milliseconds& timeout = time::milliseconds::zero()) const = 0;
-
private:
/**
* @brief Callback to be called when data is received
@@ -88,6 +81,12 @@
onDataRetrievalFailure(const ndn::Interest& interest, const std::string& errorCode) = 0;
/**
+ * @brief Callback to be called when a torrent file segment is received
+ */
+ virtual void
+ onTorrentFileSegmentReceived(const std::vector<Name>& manifestNames) = 0;
+
+ /**
* @brief Callback to be called when a file manifest is received
*/
virtual void
diff --git a/src/main.cpp b/src/main.cpp
index 5500ada..1deee0b 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -18,8 +18,6 @@
*
* See AUTHORS.md for complete list of nTorrent authors and contributors.
*/
-
-
#include "sequential-data-fetcher.hpp"
#include "torrent-file.hpp"
#include "util/io-util.hpp"
@@ -37,6 +35,7 @@
namespace logging = boost::log;
namespace po = boost::program_options;
+using namespace ndn;
using namespace ndn::ntorrent;
namespace ndn {
@@ -66,6 +65,7 @@
("help,h", "produce help message")
("generate,g" , "-g <data directory> <output-path>? <names-per-segment>? <names-per-manifest-segment>? <data-packet-size>?")
("seed,s", "After download completes, continue to seed")
+ ("dump,d", "-d <file> Dump the contents of the Data stored at the <file>.")
("args", po::value<std::vector<std::string> >(), "For arguments you want to specify without flags")
;
po::positional_options_description p;
@@ -80,60 +80,70 @@
std::cout << desc << std::endl;
return 1;
}
- // if generate mode
- if (vm.count("generate") && vm.count("args")) {
+ if (vm.count("args")) {
auto args = vm["args"].as<std::vector<std::string>>();
- if (args.size() < 1 || args.size() > 5) {
- throw ndn::Error("wrong number of arguments for generate");
- }
- auto dataPath = args[0];
- auto outputPath = args.size() >= 2 ? args[1] : ".appdata/";
- auto namesPerSegment = args.size() >= 3 ? boost::lexical_cast<size_t>(args[2]) : 1024;
- auto namesPerManifest = args.size() >= 4 ? boost::lexical_cast<size_t>(args[3]) : 1024;
- auto dataPacketSize = args.size() == 5 ? boost::lexical_cast<size_t>(args[4]) : 1024;
+ // if generate mode
+ if (vm.count("generate")) {
+ if (args.size() < 1 || args.size() > 5) {
+ throw ndn::Error("wrong number of arguments for generate");
+ }
+ auto dataPath = args[0];
+ auto outputPath = args.size() >= 2 ? args[1] : ".appdata/";
+ auto namesPerSegment = args.size() >= 3 ? boost::lexical_cast<size_t>(args[2]) : 1024;
+ auto namesPerManifest = args.size() >= 4 ? boost::lexical_cast<size_t>(args[3]) : 1024;
+ auto dataPacketSize = args.size() == 5 ? boost::lexical_cast<size_t>(args[4]) : 1024;
- const auto& content = TorrentFile::generate(dataPath,
- namesPerSegment,
- namesPerManifest,
- dataPacketSize);
- const auto& torrentSegments = content.first;
- std::vector<FileManifest> manifests;
- for (const auto& ms : content.second) {
- manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
- }
- auto torrentPrefix = fs::canonical(dataPath).filename().string();
- outputPath += torrentPrefix;
- auto torrentPath = outputPath + "/torrent_files/";
- // write all the torrent segments
- for (const TorrentFile& t : torrentSegments) {
- if (!IoUtil::writeTorrentSegment(t, torrentPath)) {
- LOG_ERROR << "Write failed: " << t.getName() << std::endl;
- return -1;
+ const auto& content = TorrentFile::generate(dataPath,
+ namesPerSegment,
+ namesPerManifest,
+ dataPacketSize);
+ const auto& torrentSegments = content.first;
+ std::vector<FileManifest> manifests;
+ for (const auto& ms : content.second) {
+ manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
+ }
+ auto torrentPrefix = fs::canonical(dataPath).filename().string();
+ outputPath += torrentPrefix;
+ auto torrentPath = outputPath + "/torrent_files/";
+ // write all the torrent segments
+ for (const TorrentFile& t : torrentSegments) {
+ if (!IoUtil::writeTorrentSegment(t, torrentPath)) {
+ LOG_ERROR << "Write failed: " << t.getName() << std::endl;
+ return -1;
+ }
+ }
+ auto manifestPath = outputPath + "/manifests/";
+ for (const FileManifest& m : manifests) {
+ if (!IoUtil::writeFileManifest(m, manifestPath)) {
+ LOG_ERROR << "Write failed: " << m.getName() << std::endl;
+ return -1;
+ }
}
}
- auto manifestPath = outputPath + "/manifests/";
- for (const FileManifest& m : manifests) {
- if (!IoUtil::writeFileManifest(m, manifestPath)) {
- LOG_ERROR << "Write failed: " << m.getName() << std::endl;
- return -1;
+ // if dump mode
+ else if(vm.count("dump")) {
+ if (args.size() != 1) {
+ throw ndn::Error("wrong number of arguments for dump");
+ }
+ auto filePath = args[0];
+ auto data = io::load<Data>(filePath);
+ if (nullptr != data) {
+ std::cout << data->getFullName() << std::endl;
+ }
+ else {
+ throw ndn::Error("Invalid data.");
}
}
- }
- // otherwise we are in torrent mode if we have the required args, start the sequential fetcher
- else if (vm.count("args")) {
- // <torrent-file-name> <data-path>
- auto args = vm["args"].as<std::vector<std::string>>();
- if (args.size() != 2) {
- throw ndn::Error("wrong number of arguments for generate");
- }
- auto torrentName = args[0];
- auto dataPath = args[1];
- SequentialDataFetcher fetcher(torrentName, dataPath);
- // download all the code
- fetcher.start();
- std::cout << torrentName << " done" << std::endl;
- if (vm.count("seed")) {
- fetcher.seed();
+ // standard torrent mode
+ else {
+ // <torrent-file-name> <data-path>
+ if (args.size() != 2) {
+ throw ndn::Error("wrong number of arguments for torrent");
+ }
+ auto torrentName = args[0];
+ auto dataPath = args[1];
+ SequentialDataFetcher fetcher(torrentName, dataPath);
+ fetcher.start();
}
}
else {
@@ -149,4 +159,4 @@
std::cerr << "Exception of unknown type!\n";
}
return 0;
-}
\ No newline at end of file
+}
diff --git a/src/sequential-data-fetcher.cpp b/src/sequential-data-fetcher.cpp
index b994384..9a5ae5f 100644
--- a/src/sequential-data-fetcher.cpp
+++ b/src/sequential-data-fetcher.cpp
@@ -39,11 +39,12 @@
}
void
-SequentialDataFetcher::start()
+SequentialDataFetcher::start(const time::milliseconds& timeout)
{
m_manager->Initialize();
// downloading logic
this->implementSequentialLogic();
+ m_manager->processEvents(timeout);
}
void
@@ -62,28 +63,24 @@
throw(Error("Not implemented yet"));
}
-std::vector<ndn::Name>
+void
SequentialDataFetcher::downloadTorrentFile()
{
- std::vector<ndn::Name> returnedNames;
- returnedNames = m_manager->downloadTorrentFile(".appdata/torrent_files/");
- if (!returnedNames.empty() && IoUtil::NAME_TYPE::FILE_MANIFEST == IoUtil::findType(returnedNames[0])) {
- LOG_INFO << "Torrent File Received: "
- << m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1) << std::endl;
- }
- return returnedNames;
+ auto torrentPath = ".appdata/" + m_torrentFileName.get(-3).toUri() + "/torrent_files/";
+ m_manager->downloadTorrentFile(torrentPath,
+ bind(&SequentialDataFetcher::onTorrentFileSegmentReceived, this, _1),
+ bind(&SequentialDataFetcher::onDataRetrievalFailure, this, _1, _2));
}
void
-SequentialDataFetcher::downloadManifestFiles(const std::vector<ndn::Name>& manifestsName)
+SequentialDataFetcher::downloadManifestFiles(const std::vector<ndn::Name>& manifestNames)
{
- std::vector<ndn::Name> packetsName;
- for (auto i = manifestsName.begin(); i != manifestsName.end(); i++) {
+ auto manifestPath = ".appdata/" + m_torrentFileName.get(-3).toUri() + "/manifests/";
+ for (auto i = manifestNames.begin(); i != manifestNames.end(); i++) {
m_manager->download_file_manifest(*i,
- ".appdata/manifests/",
+ manifestPath,
bind(&SequentialDataFetcher::onManifestReceived, this, _1),
bind(&SequentialDataFetcher::onDataRetrievalFailure, this, _1, _2));
- m_manager->processEvents();
}
}
@@ -95,26 +92,37 @@
bind(&SequentialDataFetcher::onDataPacketReceived, this, _1),
bind(&SequentialDataFetcher::onDataRetrievalFailure, this, _1, _2));
}
- m_manager->processEvents();
}
void
SequentialDataFetcher::implementSequentialLogic() {
- std::vector<ndn::Name> returnedNames;
- returnedNames = this->downloadTorrentFile();
- if (returnedNames.empty()) {
- // we have downloaded the entire torrent (including manifests, data packets, etc..)
- return;
- }
- // check the first returned name whether it is the name of a file manifest or a data packet
- const Name& nameToCheck = returnedNames[0];
- if (IoUtil::findType(nameToCheck) == IoUtil::DATA_PACKET) {
- // In this case, the returned names correspond to data packets
- this->downloadPackets(returnedNames);
+ // TODO(?) Fix seeding, and implement windowing:
+ /*
+ Alex says look at ndn-cxx:
+ * fetcher with queue (with window)
+ * segment fetcher ?
+ * catchunks (pipeline?)
+ */
+ if (!m_manager->hasAllTorrentSegments()) {
+ this->downloadTorrentFile();
}
else {
- // In this case, the returned names correspond to file manifests
- this->downloadManifestFiles(returnedNames);
+ LOG_INFO << m_torrentFileName << " complete" << std::endl;
+ std::vector<ndn::Name> namesToFetch;
+ m_manager->findFileManifestsToDownload(namesToFetch);
+ if (!namesToFetch.empty()) {
+ this->downloadManifestFiles(namesToFetch);
+ }
+ else {
+ LOG_INFO << "All manifests complete" << std::endl;
+ m_manager->findAllMissingDataPackets(namesToFetch);
+ if (!namesToFetch.empty()) {
+ this->downloadPackets(namesToFetch);
+ }
+ else {
+ LOG_INFO << "All data complete" << std::endl;
+ }
+ }
}
}
@@ -126,6 +134,12 @@
}
void
+SequentialDataFetcher::onTorrentFileSegmentReceived(const std::vector<Name>& manifestNames)
+{
+ this->downloadManifestFiles(manifestNames);
+}
+
+void
SequentialDataFetcher::onManifestReceived(const std::vector<Name>& packetNames)
{
LOG_INFO << "Manifest File Received: "
diff --git a/src/sequential-data-fetcher.hpp b/src/sequential-data-fetcher.hpp
index 8687d9c..4b050db 100644
--- a/src/sequential-data-fetcher.hpp
+++ b/src/sequential-data-fetcher.hpp
@@ -56,7 +56,7 @@
* @brief Start the sequential data fetcher
*/
void
- start();
+ start(const time::milliseconds& timeout = time::milliseconds::zero());
/**
* @brief Pause the sequential data fetcher
@@ -70,11 +70,8 @@
void
resume();
- void
- seed(const time::milliseconds& timeout = time::milliseconds::zero()) const;
-
protected:
- std::vector<ndn::Name>
+ void
downloadTorrentFile();
void
@@ -95,19 +92,15 @@
virtual void
onManifestReceived(const std::vector<Name>& packetNames);
+ virtual void
+ onTorrentFileSegmentReceived(const std::vector<Name>& manifestNames);
+
private:
std::string m_dataPath;
ndn::Name m_torrentFileName;
shared_ptr<TorrentManager> m_manager;
};
-inline
-void
-SequentialDataFetcher::seed(const time::milliseconds& timeout) const
-{
- m_manager->processEvents(timeout);
-}
-
} // namespace ntorrent
} // namespace ndn
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index c976cb1..8e525e0 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -175,6 +175,7 @@
torrentName = m_torrentFileName.getSubName(1, m_torrentFileName.size() - 3);
}
+ // TODO(spyros) Get update manager working
// m_updateHandler = make_shared<UpdateHandler>(torrentName, m_keyChain,
// make_shared<StatsTable>(m_statsTable), m_face);
@@ -243,51 +244,147 @@
}
}
-std::vector<Name>
-TorrentManager::downloadTorrentFile(const std::string& path)
+shared_ptr<Name>
+TorrentManager::findTorrentFileSegmentToDownload() const
{
- // check whether we should send out an "ALIVE" Interest
- // if (m_updateHandler->needsUpdate()) {
- // m_updateHandler->sendAliveInterest(m_stats_table_iter);
- // }
- shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
- auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- return *packetNames;
- }
- else {
- return *manifestNames;
+ // if we have no segments
+ if (m_torrentSegments.empty()) {
+ return make_shared<Name>(m_torrentFileName);
+ }
+ // otherwise just return the next segment ptr of the last segment we have
+ return m_torrentSegments.back().getTorrentFilePtr();
+}
+
+shared_ptr<Name>
+TorrentManager::findManifestSegmentToDownload(const Name& manifestName) const
+{
+ //sequentially find whether we have downloaded any segments of this manifest file
+ Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
+ auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
+ [&manifestPrefix] (const FileManifest& f) {
+ return manifestPrefix.isPrefixOf(f.getName());
+ });
+
+ // if we do not have any segments of the file manifest
+ if (it == m_fileManifests.rend()) {
+ return make_shared<Name>(manifestName);
+ }
+
+ // if we already have the requested segment of the file manifest
+ if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
+ return it->submanifest_ptr();
+ }
+ // if we do not have the requested segment
+ else {
+ return make_shared<Name>(manifestName);
+ }
+}
+
+void
+TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames) const
+{
+ std::vector<Name> manifests;
+ // insert the first segment name of all the file manifests to the vector
+ for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
+ manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
+ }
+ // for each file
+ for (const auto& manifestName : manifests) {
+ // find the first (if any) segment we are missing
+ shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
+ if (nullptr != manifestSegmentName) {
+ manifestNames.push_back(*manifestSegmentName);
}
}
- this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
- false, {}, {});
- return *manifestNames;
+}
+
+bool
+TorrentManager::hasDataPacket(const Name& dataName) const
+{
+
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
+ [&dataName](const FileManifest& m) {
+ return m.getName().isPrefixOf(dataName);
+ });
+
+ // if we do not have the file manifest, just return false
+ if (manifest_it == m_fileManifests.end()) {
+ return false;
+ }
+
+ // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
+ // that corresponds to the specific submanifest
+ auto fileState_it = m_fileStates.find(manifest_it->getFullName());
+ if (m_fileStates.end() != fileState_it) {
+ const auto& fileState = fileState_it->second;
+ auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
+ // find whether we have the requested packet from the bitmap
+ return fileState.second[dataNum];
+ }
+ return false;
+}
+
+void
+TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames) const
+{
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
+ [&manifestName](const FileManifest& m) {
+ return m.name().getSubName(0, m.name().size()
+ - 1).isPrefixOf(manifestName);
+ });
+
+ for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
+ auto& fileState = m_fileStates[j->getFullName()];
+ for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
+ if (!fileState.second[dataNum]) {
+ packetNames.push_back(j->catalog()[dataNum]);
+ }
+ }
+
+ // check that the next manifest in the vector refers to the next segment of the same file
+ if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
+ break;
+ }
+ }
+}
+
+void
+TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames) const
+{
+ for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
+ auto fileState_it = m_fileStates.find(j->getFullName());
+ // if we have no packets from this file
+ if (m_fileStates.end() == fileState_it) {
+ packetNames.reserve(packetNames.size() + j->catalog().size());
+ packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
+ }
+ // find the packets that we are missing
+ else {
+ const auto &fileState = fileState_it->second;
+ for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
+ auto dataNum = i->get(i->size() - 2).toSequenceNumber();
+ if (!fileState.second[dataNum]) {
+ packetNames.push_back(*i);
+ }
+ }
+ }
+ }
}
void
TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
const std::string& path,
- std::shared_ptr<std::vector<Name>> manifestNames,
- bool async,
TorrentFileReceivedCallback onSuccess,
FailedCallback onFailed)
{
shared_ptr<Interest> interest = createInterest(name);
- auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
+ auto dataReceived = [path, onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
-
- if (async) {
- manifestNames->clear();
- }
-
+ std::vector<Name> manifestNames;
TorrentFile file(data.wireEncode());
// Write the torrent file segment to disk...
@@ -295,22 +392,19 @@
// if successfully written, seed this data
seed(file);
}
-
const std::vector<Name>& manifestCatalog = file.getCatalog();
- manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
+ manifestNames.insert(manifestNames.end(), manifestCatalog.begin(), manifestCatalog.end());
shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
-
- if (async) {
- onSuccess(*manifestNames);
+ if (onSuccess) {
+ onSuccess(manifestNames);
}
if (nextSegmentPtr != nullptr) {
- this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
- async, onSuccess, onFailed);
+ this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
}
};
- auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
+ auto dataFailed = [path, name, onSuccess, onFailed, this]
(const Interest& interest) {
++m_retries;
if (m_retries >= MAX_NUM_OF_RETRIES) {
@@ -319,17 +413,13 @@
m_stats_table_iter = m_statsTable.begin();
}
}
- if (async) {
+ if (onFailed) {
onFailed(interest.getName(), "Unknown error");
}
- this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
+ this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
};
-
+ LOG_TRACE << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
-
- if (!async) {
- m_face->processEvents();
- }
}
void
@@ -339,21 +429,16 @@
{
shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- onSuccess(*packetNames);
- return;
- }
- else {
- onSuccess(*manifestNames);
- return;
+ if (searchRes != nullptr) {
+ this->downloadTorrentFileSegment(*searchRes, path, onSuccess, onFailed);
+ }
+ else {
+ std::vector<Name> manifests;
+ findFileManifestsToDownload(manifests);
+ if (onSuccess) {
+ onSuccess(manifests);
}
}
- this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
- true, onSuccess, onFailed);
}
void
@@ -377,7 +462,7 @@
DataReceivedCallback onSuccess,
FailedCallback onFailed)
{
- if (this->dataAlreadyDownloaded(packetName)) {
+ if (this->hasDataPacket(packetName)) {
onSuccess(packetName);
return;
}
@@ -390,7 +475,6 @@
if(writeData(data)) {
seed(data);
}
-
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
@@ -401,12 +485,13 @@
m_retries++;
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
- if (m_stats_table_iter == m_statsTable.end())
+ if (m_stats_table_iter == m_statsTable.end()) {
m_stats_table_iter = m_statsTable.begin();
+ }
}
onFailed(interest.getName(), "Unknown failure");
};
-
+ LOG_TRACE << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
}
@@ -454,6 +539,7 @@
// TODO(msweatt) Fix this once code is merged
auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
+ fileState.first->flush();
// update bitmap
fileState.second[packetNum] = true;
return true;
@@ -554,7 +640,7 @@
}
onFailed(interest.getName(), "Unknown failure");
};
-
+ LOG_TRACE << "Sending: " << *interest << std::endl;
m_face->expressInterest(*interest, dataReceived, dataFailed);
}
@@ -562,6 +648,7 @@
TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
// handle if it is a torrent-file
+ LOG_TRACE << "Interest Recevied: " << interest << std::endl;
const auto& interestName = interest.getName();
std::shared_ptr<Data> data = nullptr;
auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
@@ -628,144 +715,17 @@
m_face->shutdown();
}
-shared_ptr<Name>
-TorrentManager::findTorrentFileSegmentToDownload()
-{
- // if we have no segments
- if (m_torrentSegments.empty()) {
- return make_shared<Name>(m_torrentFileName);
- }
- // otherwise just return the next segment ptr of the last segment we have
- return m_torrentSegments.back().getTorrentFilePtr();
-}
-
-shared_ptr<Name>
-TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
-{
- //sequentially find whether we have downloaded any segments of this manifest file
- Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
- auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
- [&manifestPrefix] (const FileManifest& f) {
- return manifestPrefix.isPrefixOf(f.getName());
- });
-
- // if we do not have any segments of the file manifest
- if (it == m_fileManifests.rend()) {
- return make_shared<Name>(manifestName);
- }
-
- // if we already have the requested segment of the file manifest
- if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
- return it->submanifest_ptr();
- }
- // if we do not have the requested segment
- else {
- return make_shared<Name>(manifestName);
- }
-}
-
-void
-TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
-{
- std::vector<Name> manifests;
- // insert the first segment name of all the file manifests to the vector
- for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
- manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
- }
- // for each file
- for (const auto& manifestName : manifests) {
- // find the first (if any) segment we are missing
- shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
- if (nullptr != manifestSegmentName) {
- manifestNames.push_back(*manifestSegmentName);
- }
- }
-}
-
-bool
-TorrentManager::dataAlreadyDownloaded(const Name& dataName)
-{
-
- auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
- [&dataName](const FileManifest& m) {
- return m.getName().isPrefixOf(dataName);
- });
-
- // if we do not have the file manifest, just return false
- if (manifest_it == m_fileManifests.end()) {
- return false;
- }
-
- // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
- // that corresponds to the specific submanifest
- auto fileState_it = m_fileStates.find(manifest_it->getFullName());
- if (m_fileStates.end() != fileState_it) {
- const auto& fileState = fileState_it->second;
- auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
- // find whether we have the requested packet from the bitmap
- return fileState.second[dataNum];
- }
- return false;
-}
-
-void
-TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
-{
- auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
- [&manifestName](const FileManifest& m) {
- return m.name().getSubName(0, m.name().size()
- - 1).isPrefixOf(manifestName);
- });
-
- for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
- auto& fileState = m_fileStates[j->getFullName()];
- for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
- if (!fileState.second[dataNum]) {
- packetNames.push_back(j->catalog()[dataNum]);
- }
- }
-
- // check that the next manifest in the vector refers to the next segment of the same file
- if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
- break;
- }
- }
-}
-
-void
-TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
-{
- for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
- auto fileState_it = m_fileStates.find(j->getFullName());
- // if we have no packets from this file
- if (m_fileStates.end() == fileState_it) {
- packetNames.reserve(packetNames.size() + j->catalog().size());
- packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
- }
- // find the packets that we are missing
- else {
- const auto &fileState = fileState_it->second;
- for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
- auto dataNum = i->get(i->size() - 2).toSequenceNumber();
- if (!fileState.second[dataNum]) {
- packetNames.push_back(*i);
- }
- }
- }
- }
-}
-
shared_ptr<Interest>
TorrentManager::createInterest(Name name)
{
shared_ptr<Interest> interest = make_shared<Interest>(name);
interest->setInterestLifetime(time::milliseconds(2000));
- interest->setMustBeFresh(true);
// Select routable prefix
- Link link(name, { {1, m_stats_table_iter->getRecordName()} });
- m_keyChain->sign(link, signingWithSha256());
- Block linkWire = link.wireEncode();
+ // TODO(spyros) Fix links
+ // Link link(name, { {1, m_stats_table_iter->getRecordName()} });
+ // m_keyChain->sign(link, signingWithSha256());
+ // Block linkWire = link.wireEncode();
// Stats Table update here...
m_stats_table_iter->incrementSentInterests();
@@ -784,7 +744,7 @@
m_retries = 0;
}
- interest->setLink(linkWire);
+ // interest->setLink(linkWire);
return interest;
}
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index 4a8beb4..a3197d1 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -80,14 +80,71 @@
void
Initialize();
+ /**
+ * brief Return 'true' if all segments of the torrent file downloaded, 'false' otherwise.
+ */
+ bool
+ hasAllTorrentSegments() const;
+
/*
- * @brief Download the torrent file
- * @param path The path to write the downloaded segments
- * @return A vector of the file manifest names contained in the torrent file
+ * \brief Given a data packet name, find whether we have already downloaded this packet
+ * @param dataName The name of the data packet to download
+ * @return True if we have already downloaded this packet, false otherwise
*
*/
- std::vector<Name>
- downloadTorrentFile(const std::string& path);
+ bool
+ hasDataPacket(const Name& dataName) const;
+
+ /*
+ * \brief Find the torrent file segment that we should download
+ * (either we have nothing or we have them all)
+ * @return A shared_ptr to the name of the segment to download or
+ * nullptr if we have all the segments
+ *
+ */
+ shared_ptr<Name>
+ findTorrentFileSegmentToDownload() const;
+
+ /*
+ * \brief Given a file manifest segment name, find the next file manifest segment
+ * that we should download
+ * @param manifestName The name of the file manifest segment that we want to download
+ * @return A shared_ptr to the name of the segment to download or
+ * nullptr if we have all the segments
+ *
+ */
+ shared_ptr<Name>
+ findManifestSegmentToDownload(const Name& manifestName) const;
+
+ /*
+ * \brief Find the segments of all the file manifests that we are missing
+ * @param manifestNames The name of the manifest file segments to download (currently missing)
+ * This parameter is used as an output vector of names
+ */
+ void
+ findFileManifestsToDownload(std::vector<Name>& manifestNames) const;
+
+ /*
+ * \brief Find the names of the data packets of a file manifest that we are currently missing
+ * @param manifestName The name of the manifest
+ * @param packetNames The name of the data packets to be downloaded
+ * (used as an output vector of names)
+ *
+ * No matter what segment of a manifest file the manifestName parameter might refer to, the
+ * missing data packets starting from the first segment of this manifest file would be returned
+ *
+ */
+ void
+ findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames) const;
+
+ /*
+ * \brief Find all the data packets that we are currently missing
+ * @param packetNames The name of the data packets to be downloaded
+ * (used as an output vector of names)
+ *
+ */
+ void
+ findAllMissingDataPackets(std::vector<Name>& packetNames) const;
/*
* @brief Download the torrent file
@@ -100,12 +157,11 @@
* failed to download and a failure reason to the callback
*
* This method provides non-blocking downloading of all the torrent file segments
- *
*/
void
downloadTorrentFile(const std::string& path,
- TorrentFileReceivedCallback onSuccess,
- FailedCallback onFailed);
+ TorrentFileReceivedCallback onSuccess = {},
+ FailedCallback onFailed = {});
/*
* @brief Download a file manifest
@@ -149,10 +205,10 @@
/**
* @brief Process any data to receive or call timeout callbacks and update prefix list (if needed)
- * By default only process pending events and return immediately, optionally specify a timeout.
+ * By default this blocks until all operations are complete.
*/
void
- processEvents(const time::milliseconds& timeout = time::milliseconds(-1));
+ processEvents(const time::milliseconds& timeout = time::milliseconds(0));
protected:
/**
@@ -192,23 +248,15 @@
* \brief Download the segments of the torrent file
* @param name The name of the torrent file to be downloaded
* @param path The path to write the torrent file on disk
- * @param manifestNames A vector containing the name of the manifests in the torrent file.
- * This parameter will be updated every time we receive a torrent
- * file segment
- * @param async Blocking (sync) or non-blocking (async) operation
* @param onSuccess Optional callback to be called when all the segments of the torrent file
- * have been downloaded. The default value is an empty callack. A callback
- * should be specified when async is false
+ * have been downloaded. The default value is an empty callback.
* @param onFailed Optional callback to be called when we fail to download a segment of the
- * torrent file. The default value is an empty callack. A callback should be
- * specified when async is false
+ * torrent file. The default value is an empty callback.
*
*/
void
downloadTorrentFileSegment(const ndn::Name& name,
const std::string& path,
- std::shared_ptr<std::vector<Name>> manifestNames,
- bool async,
TorrentFileReceivedCallback onSuccess,
FailedCallback onFailed);
@@ -244,6 +292,7 @@
void
onRegisterFailed(const Name& prefix, const std::string& reason);
+protected:
// A map from each fileManifest to corresponding file stream on disk and a bitmap of which Data
// packets this manager currently has
mutable std::unordered_map<Name,
@@ -260,67 +309,6 @@
// The path to the location on disk of the Data packet for this manager
std::string m_dataPath;
-protected:
- /*
- * \brief Find the torrent file segment that we should download
- * (either we have nothing or we have them all)
- * @return A shared_ptr to the name of the segment to download or
- * nullptr if we have all the segments
- *
- */
- shared_ptr<Name>
- findTorrentFileSegmentToDownload();
-
- /*
- * \brief Given a file manifest segment name, find the next file manifest segment
- * that we should download
- * @param manifestName The name of the file manifest segment that we want to download
- * @return A shared_ptr to the name of the segment to download or
- * nullptr if we have all the segments
- *
- */
- shared_ptr<Name>
- findManifestSegmentToDownload(const Name& manifestName);
-
- /*
- * \brief Given a data packet name, find whether we have already downloaded this packet
- * @param dataName The name of the data packet to download
- * @return True if we have already downloaded this packet, false otherwise
- *
- */
- bool
- dataAlreadyDownloaded(const Name& dataName);
-
- /*
- * \brief Find the segments of all the file manifests that we are missing
- * @param manifestNames The name of the manifest file segments to download (currently missing)
- * This parameter is used as an output vector of names
- */
- void
- findFileManifestsToDownload(std::vector<Name>& manifestNames);
-
- /*
- * \brief Find the names of the data packets of a file manifest that we are currently missing
- * @param manifestName The name of the manifest
- * @param packetNames The name of the data packets to be downloaded
- * (used as an output vector of names)
- *
- * No matter what segment of a manifest file the manifestName parameter might refer to, the
- * missing data packets starting from the first segment of this manifest file would be returned
- *
- */
- void
- findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames);
-
- /*
- * \brief Find all the data packets that we are currently missing
- * @param packetNames The name of the data packets to be downloaded
- * (used as an output vector of names)
- *
- */
- void
- findAllMissingDataPackets(std::vector<Name>& packetNames);
-
private:
shared_ptr<Interest>
createInterest(Name name);
@@ -337,8 +325,9 @@
uint64_t m_sortingCounter;
// Keychain instance
shared_ptr<KeyChain> m_keyChain;
- // Update Handler instance
- shared_ptr<UpdateHandler> m_updateHandler;
+ // TODO(spyros) Fix and reintegrate update handler
+ // // Update Handler instance
+ // shared_ptr<UpdateHandler> m_updateHandler;
};
inline
@@ -373,6 +362,13 @@
m_face->processEvents(timeout);
}
+inline
+bool
+TorrentManager::hasAllTorrentSegments() const
+{
+ return findTorrentFileSegmentToDownload() == nullptr;
+}
+
} // end ntorrent
} // end ndn
diff --git a/tests/unit-tests/torrent-manager.t.cpp b/tests/unit-tests/torrent-manager.t.cpp
index d0f5110..7151aec 100644
--- a/tests/unit-tests/torrent-manager.t.cpp
+++ b/tests/unit-tests/torrent-manager.t.cpp
@@ -78,10 +78,6 @@
return TorrentManager::findManifestSegmentToDownload(manifestName);
}
- bool dataAlreadyDownloaded(const Name& name) {
- return TorrentManager::dataAlreadyDownloaded(name);
- }
-
std::vector<bool> fileState(const ndn::Name& manifestName) {
auto fout = m_fileStates[manifestName].first;
if (nullptr != fout) {
@@ -212,6 +208,7 @@
face);
manager.Initialize();
+ BOOST_CHECK(manager.hasAllTorrentSegments());
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
@@ -241,6 +238,8 @@
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
+ BOOST_CHECK(!manager.hasAllTorrentSegments());
+
BOOST_CHECK(manager.torrentSegments() == vector<TorrentFile>());
BOOST_CHECK(manager.fileManifests() == vector<FileManifest>());
}
@@ -295,6 +294,7 @@
face);
manager.Initialize();
+ BOOST_CHECK(manager.hasAllTorrentSegments());
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
@@ -370,6 +370,7 @@
face);
manager.Initialize();
+ BOOST_CHECK(manager.hasAllTorrentSegments());
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
@@ -416,6 +417,7 @@
filePath, face);
manager.Initialize();
+ BOOST_CHECK(!manager.hasAllTorrentSegments());
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
@@ -440,7 +442,7 @@
advanceClocks(time::milliseconds(1), 40);
face->receive(dynamic_cast<Data&>(*i));
}
-
+ BOOST_CHECK(manager.hasAllTorrentSegments());
fs::remove_all(filePath);
fs::remove_all(".appdata");
}
@@ -602,7 +604,8 @@
BOOST_CHECK(!(manager.findTorrentFileSegmentToDownload()));
std::vector<Name> manifests;
- manifests = manager.downloadTorrentFile("/test");
+ manager.downloadTorrentFile("/test");
+ manager.findFileManifestsToDownload(manifests);
BOOST_CHECK_EQUAL(manifests[0].toUri(), "/manifest1");
BOOST_CHECK_EQUAL(manifests[1].toUri(), "/manifest2");
@@ -633,96 +636,6 @@
fs::remove_all(".appdata");
}
-// we have the torrent file and the manifests
-BOOST_AUTO_TEST_CASE(TestFindTorrentFileSegmentToDownload3)
-{
- vector<FileManifest> manifests;
- vector<TorrentFile> torrentSegments;
- // for each file, the data packets
- std::vector<vector<Data>> fileData;
- std::string filePath = "tests/testdata/temp";
- // get torrent files and manifests
- {
- auto temp = TorrentFile::generate("tests/testdata/foo",
- 1024,
- 2048,
- 8192,
- true);
- torrentSegments = temp.first;
- auto temp1 = temp.second;
- for (const auto& ms : temp1) {
- manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
- fileData.push_back(ms.second);
- }
- }
- // write the torrent segments and manifests to disk
- std::string dirPath = ".appdata/foo/";
- boost::filesystem::create_directories(dirPath);
- std::string torrentPath = dirPath + "torrent_files/";
- boost::filesystem::create_directories(torrentPath);
- auto fileNum = 0;
- for (const auto& t : torrentSegments) {
- fileNum++;
- auto filename = torrentPath + to_string(fileNum);
- io::save(t, filename);
- }
-
- auto manifestPath = dirPath + "manifests/";
- boost::filesystem::create_directory(manifestPath);
- for (const auto& m : manifests) {
- fs::path filename = manifestPath + m.file_name() + to_string(m.submanifest_number());
- boost::filesystem::create_directory(filename.parent_path());
- io::save(m, filename.string());
- }
- // Initialize manager
- TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=a8a2e98cd943d895b8c4b12a208343bcf9344ce85a6376dc6f5754fe8f4a573e",
- filePath,
- face);
-
- manager.Initialize();
-
- advanceClocks(time::milliseconds(1), 10);
- manager.sendRoutablePrefixResponse();
-
- // Set the file state
- std::vector<bool> v1 = {true};
- manager.setFileState(manifests[0].getFullName(), make_shared<fs::fstream>(), v1);
-
- std::vector<bool> v2 = {false, true, true, false, false, false};
- manager.setFileState(manifests[1].getFullName(), make_shared<fs::fstream>(), v2);
-
- std::vector<bool> v3 = {true, false, false, false, false, false};
- manager.setFileState(manifests[2].getFullName(), make_shared<fs::fstream>(), v3);
-
- manager.downloadTorrentFile(filePath + "torrent_files/",
- [&manifests](const std::vector<ndn::Name>& vec) {
- BOOST_CHECK_EQUAL(vec[0].toUri(),
- manifests[1].catalog()[0].toUri());
- BOOST_CHECK_EQUAL(vec[1].toUri(),
- manifests[1].catalog()[3].toUri());
- BOOST_CHECK_EQUAL(vec[2].toUri(),
- manifests[1].catalog()[4].toUri());
- BOOST_CHECK_EQUAL(vec[3].toUri(),
- manifests[1].catalog()[5].toUri());
- BOOST_CHECK_EQUAL(vec[4].toUri(),
- manifests[2].catalog()[1].toUri());
- BOOST_CHECK_EQUAL(vec[5].toUri(),
- manifests[2].catalog()[2].toUri());
- BOOST_CHECK_EQUAL(vec[6].toUri(),
- manifests[2].catalog()[3].toUri());
- BOOST_CHECK_EQUAL(vec[7].toUri(),
- manifests[2].catalog()[4].toUri());
- BOOST_CHECK_EQUAL(vec[8].toUri(),
- manifests[2].catalog()[5].toUri());
- },
- [](const ndn::Name& name, const std::string& reason) {
- BOOST_FAIL("Unexpected failure");
- });
-
- fs::remove_all(filePath);
- fs::remove_all(".appdata");
-}
-
BOOST_AUTO_TEST_CASE(TestFindManifestSegmentToDownload1)
{
std::string filePath = ".appdata/foo/";
@@ -946,6 +859,7 @@
face);
manager.Initialize();
+ BOOST_CHECK(manager.hasAllTorrentSegments());
advanceClocks(time::milliseconds(1), 10);
manager.sendRoutablePrefixResponse();
@@ -965,14 +879,14 @@
p1.appendSequenceNumber(0);
p1 = Name(p1.toUri() + "/sha256digest");
- BOOST_CHECK(!(manager.dataAlreadyDownloaded(p1)));
+ BOOST_CHECK(!(manager.hasDataPacket(p1)));
Name p2("NTORRENT/foo/bar.txt");
p2.appendSequenceNumber(0);
p2.appendSequenceNumber(0);
p2 = Name(p2.toUri() + "/sha256digest");
- BOOST_CHECK(manager.dataAlreadyDownloaded(p2));
+ BOOST_CHECK(manager.hasDataPacket(p2));
}
BOOST_AUTO_TEST_CASE(CheckSeedComplete)
@@ -1061,7 +975,7 @@
}));
advanceClocks(time::milliseconds(1), 40);
face->receive(interest);
- manager.processEvents();
+ manager.processEvents(time::milliseconds(-1));
// check that one piece of data is sent, and it is what was expected
BOOST_CHECK_EQUAL(++nData, face->sentData.size());
face->receive(face->sentData[nData - 1]);
@@ -1083,7 +997,7 @@
}));
advanceClocks(time::milliseconds(1), 40);
face->receive(interest);
- manager.processEvents();
+ manager.processEvents(time::milliseconds(-1));
// check that one piece of data is sent, and it is what was expected
BOOST_CHECK_EQUAL(++nData, face->sentData.size());
face->receive(face->sentData[nData - 1]);
@@ -1104,7 +1018,7 @@
}));
advanceClocks(time::milliseconds(1), 40);
face->receive(interest);
- manager.processEvents();
+ manager.processEvents(time::milliseconds(-1));
// check that one piece of data is sent, and it is what was expected
BOOST_CHECK_EQUAL(++nData, face->sentData.size());
face->receive(face->sentData[nData - 1]);
@@ -1188,7 +1102,7 @@
}));
advanceClocks(time::milliseconds(1), 40);
face->receive(interest);
- manager.processEvents();
+ manager.processEvents(time::milliseconds(-1));
// check that one piece of data is sent, and it is what was expected
BOOST_CHECK_EQUAL(++nData, face->sentData.size());
face->receive(face->sentData[nData - 1]);