Implemented TorrentManger::seed. Fixed bug in TorrentManger::Initialize of using incorrect
submanifest size. Migrate torrent-manager to use shared_ptr to face rather than a dangling
reference. Lastly, I added more tests throughout and reorganized the code to match the order of
declaration.
Change-Id: Ia427d8047ff6e69d5f274482d0d9a17e953e1f0d
diff --git a/src/torrent-file.cpp b/src/torrent-file.cpp
index d7918d2..30957bf 100644
--- a/src/torrent-file.cpp
+++ b/src/torrent-file.cpp
@@ -239,6 +239,7 @@
size_t dataPacketSize,
bool returnData)
{
+ //TODO(spyros) Adapt this support subdirectories in 'directoryPath'
BOOST_ASSERT(0 < namesPerSegment);
std::vector<TorrentFile> torrentSegments;
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 4a0eaff..155d7cf 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -127,13 +127,47 @@
return packets;
}
+// =================================================================================================
+// Torrent Manager Utility Functions
+// ==================================================================================================
+
+static std::shared_ptr<Data>
+readDataPacket(const Name& packetFullName,
+ const FileManifest& manifest,
+ size_t subManifestSize,
+ fs::fstream& is) {
+ auto dataPacketSize = manifest.data_packet_size();
+ auto start_offset = manifest.submanifest_number() * subManifestSize * dataPacketSize;
+ auto packetNum = packetFullName.get(packetFullName.size() - 2).toSequenceNumber();
+ // seek to packet
+ is.sync();
+ is.seekg(start_offset + packetNum * dataPacketSize);
+ if (is.tellg() < 0) {
+ std::cerr << "bad seek" << std::endl;
+ }
+ // read contents
+ std::vector<char> bytes(dataPacketSize);
+ is.read(&bytes.front(), dataPacketSize);
+ auto read_size = is.gcount();
+ if (is.bad() || read_size < 0) {
+ std::cerr << "Bad read" << std::endl;
+ return nullptr;
+ }
+ // construct packet
+ auto packetName = packetFullName.getSubName(0, packetFullName.size() - 1);
+ auto d = make_shared<Data>(packetName);
+ d->setContent(encoding::makeBinaryBlock(tlv::Content, &bytes.front(), read_size));
+ ndn::security::KeyChain key_chain;
+ key_chain.sign(*d, signingWithSha256());
+ return d->getFullName() == packetFullName ? d : nullptr;
+}
+
static vector<TorrentFile>
intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
{
security::KeyChain key_chain;
Name currSegmentFullName = initialSegmentName;
vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
-
// Starting with the initial segment name, verify the names, loading next name from torrentSegment
for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
TorrentFile& segment = *it;
@@ -153,85 +187,75 @@
}
static vector<FileManifest>
-intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
+intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
{
security::KeyChain key_chain;
vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
-
+ if (manifests.empty()) {
+ return manifests;
+ }
// sign the manifests
std::for_each(manifests.begin(), manifests.end(),
[&key_chain](FileManifest& m){
key_chain.sign(m,signingWithSha256());
});
- // put all names of manifests from the valid torrent files into a set
- std::set<ndn::Name> validManifestNames;
+ // put all names of initial manifests from the valid torrent files into a set
+ std::vector<ndn::Name> validInitialManifestNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
- validManifestNames.insert(catalog.begin(), catalog.end());
+ validInitialManifestNames.insert(validInitialManifestNames.end(),
+ catalog.begin(),
+ catalog.end());
}
+ auto manifest_it = manifests.begin();
+ std::vector<FileManifest> output;
+ output.reserve(manifests.size());
+ auto validIvalidInitialManifestNames_it = validInitialManifestNames.begin();
- // put all names of file manifests from disk into a set
- std::set<ndn::Name> loadedManifestNames;
- std::for_each(manifests.begin(), manifests.end(),
- [&loadedManifestNames](const FileManifest& m){
- loadedManifestNames.insert(m.getFullName());
- });
-
- // the set of fileManifests that we have is simply the intersection
- std::set<Name> output;
- std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
- loadedManifestNames.begin(), loadedManifestNames.end(),
- std::inserter(output, output.begin()));
-
- // filter out those manifests that are not in this set
- std::remove_if(manifests.begin(),
- manifests.end(),
- [&output](const FileManifest& m) {
- return (output.end() == output.find(m.name()));
- });
-
- // order the manifests in the same order they are in the torrent
- std::vector<Name> catalogNames;
- for (const auto& segment : torrentSegments) {
- const auto& catalog = segment.getCatalog();
- catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
- }
- size_t curr_index = 0;
- for (auto name : catalogNames) {
- auto it = std::find_if(manifests.begin(), manifests.end(),
- [&name](const FileManifest& m) {
- return m.getFullName() == name;
- });
- if (it != manifests.end()) {
- // not already in the correct position
- if (it != manifests.begin() + curr_index) {
- std::swap(manifests[curr_index], *it);
+ for (auto& initialName : validInitialManifestNames) {
+ // starting from the initial segment
+ auto& validName = initialName;
+ if (manifests.end() == manifest_it) {
+ break;
+ }
+ auto fileName = manifest_it->file_name();
+ // sequential collect all valid segments
+ while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
+ output.push_back(*manifest_it);
+ if (manifest_it->submanifest_ptr() != nullptr) {
+ validName = *manifest_it->submanifest_ptr();
+ ++manifest_it;
}
- ++curr_index;
+ else {
+ ++manifest_it;
+ break;
+ }
+ }
+ // skip the remain segments for this file (all invalid)
+ while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
+ ++manifest_it;
}
}
-
- return manifests;
+ return output;
}
static vector<Data>
-intializeDataPackets(const string& filePath,
- const FileManifest manifest,
- const TorrentFile& torrentFile)
+initializeDataPackets(const string& filePath,
+ const FileManifest manifest,
+ size_t subManifestSize)
{
vector<Data> packets;
auto subManifestNum = manifest.submanifest_number();
packets = packetize_file(filePath,
- manifest.name(),
- manifest.data_packet_size(),
- manifest.catalog().size(),
- subManifestNum);
+ manifest.name(),
+ manifest.data_packet_size(),
+ subManifestSize,
+ subManifestNum);
auto catalog = manifest.catalog();
-
// Filter out invalid packet names
std::remove_if(packets.begin(), packets.end(),
[&packets, &catalog](const Data& p) {
@@ -243,23 +267,36 @@
}
static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
-initializeFileState(const string& dataPath,
- const FileManifest& manifest)
+initializeFileState(const string& dataPath,
+ const FileManifest& manifest,
+ size_t subManifestSize)
{
// construct the file name
auto fileName = manifest.file_name();
auto filePath = dataPath + fileName;
vector<bool> fileBitMap(manifest.catalog().size());
- auto fbits = fs::fstream::out | fs::fstream::binary;
- // if file exists, use in O/W use concatenate mode
- fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
- auto s = std::make_shared<fs::fstream>(filePath, fbits);
- if (!*s) {
- BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
+ // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
+ if (!fs::exists(filePath)) {
+ fs::ofstream fs(filePath);
+ fs << "";
}
+ auto s = std::make_shared<fs::fstream>(filePath,
+ fs::fstream::out
+ | fs::fstream::binary
+ | fs::fstream::in);
+ if (!*s) {
+ BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
+ }
+ auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
+ s->seekg(start_offset);
+ s->seekp(start_offset);
return std::make_pair(s, fileBitMap);
}
+//==================================================================================================
+// TorrentManager Implementation
+//==================================================================================================
+
void TorrentManager::Initialize()
{
// .../<torrent_name>/torrent-file/<implicit_digest>
@@ -276,30 +313,38 @@
return;
}
m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
- auto currTorrentFile_it = m_torrentSegments.begin();
+
+ // get the submanifest sizes
for (const auto& m : m_fileManifests) {
- // find the appropriate torrent file
- auto currCatalog = currTorrentFile_it->getCatalog();
- while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
- {
- ++currTorrentFile_it;
- currCatalog = currTorrentFile_it->getCatalog();
+ if (m.submanifest_number() == 0) {
+ auto manifestFileName = m.file_name();
+ m_subManifestSizes[manifestFileName] = m.catalog().size();
}
+ }
+
+ for (const auto& m : m_fileManifests) {
// construct the file name
auto fileName = m.file_name();
fs::path filePath = m_dataPath + fileName;
// If there are any valid packets, add corresponding state to manager
if (!fs::exists(filePath)) {
- boost::filesystem::create_directories(filePath.parent_path());
+ if (!fs::exists(filePath.parent_path())) {
+ boost::filesystem::create_directories(filePath.parent_path());
+ }
continue;
}
- auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
+ auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
if (!packets.empty()) {
- m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
+ m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
+ m,
+ m_subManifestSizes[m.file_name()]);
auto& fileBitMap = m_fileStates[m.getFullName()].second;
auto read_it = packets.begin();
size_t i = 0;
for (auto name : m.catalog()) {
+ if (read_it == packets.end()) {
+ break;
+ }
if (name == read_it->getFullName()) {
++read_it;
fileBitMap[i] = true;
@@ -315,10 +360,184 @@
seed(t);
}
for (const auto& m : m_fileManifests) {
- seed(m);
+ seed(m);
}
}
+std::vector<Name>
+TorrentManager::downloadTorrentFile(const std::string& path)
+{
+ shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
+ auto manifestNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findFileManifestsToDownload(*manifestNames);
+ if (manifestNames->empty()) {
+ auto packetNames = make_shared<std::vector<Name>>();
+ this->findAllMissingDataPackets(*packetNames);
+ return *packetNames;
+ }
+ else {
+ return *manifestNames;
+ }
+ }
+ this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
+ false, {}, {});
+ return *manifestNames;
+}
+
+void
+TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> manifestNames,
+ bool async,
+ TorrentFileReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Interest> interest = createInterest(name);
+
+ auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+
+ if (async) {
+ manifestNames->clear();
+ }
+
+ TorrentFile file(data.wireEncode());
+
+ // Write the torrent file segment to disk...
+ if (writeTorrentSegment(file, path)) {
+ // if successfully written, seed this data
+ seed(data);
+ }
+
+ const std::vector<Name>& manifestCatalog = file.getCatalog();
+ manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
+
+ shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
+
+ if (async) {
+ onSuccess(*manifestNames);
+ }
+ if (nextSegmentPtr != nullptr) {
+ this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
+ async, onSuccess, onFailed);
+ }
+ };
+
+ auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
+ (const Interest& interest) {
+ ++m_retries;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ ++m_stats_table_iter;
+ if (m_stats_table_iter == m_statsTable.end()) {
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ }
+ if (async) {
+ onFailed(interest.getName(), "Unknown error");
+ }
+ this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
+ };
+
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
+
+ if (!async) {
+ m_face->processEvents();
+ }
+}
+
+void
+TorrentManager::downloadTorrentFile(const std::string& path,
+ TorrentFileReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
+ auto manifestNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findFileManifestsToDownload(*manifestNames);
+ if (manifestNames->empty()) {
+ auto packetNames = make_shared<std::vector<Name>>();
+ this->findAllMissingDataPackets(*packetNames);
+ onSuccess(*packetNames);
+ return;
+ }
+ else {
+ onSuccess(*manifestNames);
+ return;
+ }
+ }
+ this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
+ true, onSuccess, onFailed);
+}
+
+void
+TorrentManager::download_file_manifest(const Name& manifestName,
+ const std::string& path,
+ TorrentManager::ManifestReceivedCallback onSuccess,
+ TorrentManager::FailedCallback onFailed)
+{
+ shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
+ auto packetNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findDataPacketsToDownload(manifestName, *packetNames);
+ onSuccess(*packetNames);
+ return;
+ }
+ this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
+}
+
+void
+TorrentManager::download_data_packet(const Name& packetName,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ if (this->dataAlreadyDownloaded(packetName)) {
+ onSuccess(packetName);
+ return;
+ }
+
+ shared_ptr<Interest> interest = this->createInterest(packetName);
+
+ auto dataReceived = [onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Write data to disk...
+ if(writeData(data)) {
+ seed(data);
+ }
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+ onSuccess(data.getName());
+ };
+ auto dataFailed = [onFailed, this]
+ (const Interest& interest) {
+ m_retries++;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ m_stats_table_iter++;
+ if (m_stats_table_iter == m_statsTable.end())
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ onFailed(interest.getName(), "Unknown failure");
+ };
+
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
+}
+
+void TorrentManager::seed(const Data& data) {
+ m_face->setInterestFilter(data.getFullName(),
+ bind(&TorrentManager::onInterestReceived, this, _1, _2),
+ RegisterPrefixSuccessCallback(),
+ bind(&TorrentManager::onRegisterFailed, this, _1, _2));
+}
+
+// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
+// Protected Helpers
+// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
+
bool TorrentManager::writeData(const Data& packet)
{
// find correct manifest
@@ -332,18 +551,28 @@
}
// get file state out
auto& fileState = m_fileStates[manifest_it->getFullName()];
+
// if there is no open stream to the file
if (nullptr == fileState.first) {
- fileState = initializeFileState(m_dataPath, *manifest_it);
+ fs::path filePath = m_dataPath + manifest_it->file_name();
+ if (!fs::exists(filePath)) {
+ fs::create_directories(filePath.parent_path());
+ }
+
+ fileState = initializeFileState(m_dataPath,
+ *manifest_it,
+ m_subManifestSizes[manifest_it->file_name()]);
}
auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
// if we already have the packet, do not rewrite it.
if (fileState.second[packetNum]) {
return false;
}
- auto packetOffset = packetNum * manifest_it->data_packet_size();
+ auto dataPacketSize = manifest_it->data_packet_size();
+ auto initial_offset = manifest_it->submanifest_number() * m_subManifestSizes[manifest_it->file_name()] * dataPacketSize;
+ auto packetOffset = initial_offset + packetNum * dataPacketSize;
// write data to disk
- fileState.first->seekg(packetOffset);
+ fileState.first->seekp(packetOffset);
try {
auto content = packet.getContent();
std::vector<char> data(content.value_begin(), content.value_end());
@@ -419,144 +648,20 @@
}
io::save(manifest, filename.string());
// add to collection
- // add to collection
auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
[&manifest](const FileManifest& m){
return m.file_name() > manifest.file_name()
|| (m.file_name() == manifest.file_name()
&& (m.submanifest_number() > manifest.submanifest_number()));
});
+ // update the state of the manager
m_fileManifests.insert(it, manifest);
+ if (0 == manifest.submanifest_number()) {
+ m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
+ }
return true;
}
-void TorrentManager::seed(const Data& data) const {
- // TODO(msweatt) IMPLEMENT ME
-}
-
-void
-TorrentManager::downloadTorrentFile(const std::string& path,
- TorrentFileReceivedCallback onSuccess,
- FailedCallback onFailed)
-{
- shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
- auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- onSuccess(*packetNames);
- return;
- }
- else {
- onSuccess(*manifestNames);
- return;
- }
- }
- this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
- true, onSuccess, onFailed);
-}
-
-std::vector<Name>
-TorrentManager::downloadTorrentFile(const std::string& path)
-{
- shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
- auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- return *packetNames;
- }
- else {
- return *manifestNames;
- }
- }
- this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
- false, {}, {});
- return *manifestNames;
-}
-
-void
-TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
- const std::string& path,
- std::shared_ptr<std::vector<Name>> manifestNames,
- bool async,
- TorrentFileReceivedCallback onSuccess,
- FailedCallback onFailed)
-{
- shared_ptr<Interest> interest = createInterest(name);
-
- auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
- (const Interest& interest, const Data& data) {
- // Stats Table update here...
- m_stats_table_iter->incrementReceivedData();
- m_retries = 0;
-
- if (async) {
- manifestNames->clear();
- }
-
- TorrentFile file(data.wireEncode());
-
- // Write the torrent file segment to disk...
- writeTorrentSegment(file, path);
-
- const std::vector<Name>& manifestCatalog = file.getCatalog();
- manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
-
- shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
-
- if (async) {
- onSuccess(*manifestNames);
- }
- if (nextSegmentPtr != nullptr) {
- this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
- async, onSuccess, onFailed);
- }
- };
-
- auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
- (const Interest& interest) {
- ++m_retries;
- if (m_retries >= MAX_NUM_OF_RETRIES) {
- ++m_stats_table_iter;
- if (m_stats_table_iter == m_statsTable.end()) {
- m_stats_table_iter = m_statsTable.begin();
- }
- }
- if (async) {
- onFailed(interest.getName(), "Unknown error");
- }
- this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
- };
-
- m_face.expressInterest(*interest, dataReceived, dataFailed);
-
- if (!async) {
- m_face.processEvents();
- }
-}
-
-
-void
-TorrentManager::download_file_manifest(const Name& manifestName,
- const std::string& path,
- TorrentManager::ManifestReceivedCallback onSuccess,
- TorrentManager::FailedCallback onFailed)
-{
- shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
- auto packetNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findDataPacketsToDownload(manifestName, *packetNames);
- onSuccess(*packetNames);
- return;
- }
- this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
-}
-
void
TorrentManager::downloadFileManifestSegment(const Name& manifestName,
const std::string& path,
@@ -575,7 +680,9 @@
FileManifest file(data.wireEncode());
// Write the file manifest segment to disk...
- writeFileManifest(file, path);
+ if( writeFileManifest(file, path)) {
+ seed(file);
+ }
const std::vector<Name>& packetsCatalog = file.catalog();
packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
@@ -598,71 +705,78 @@
onFailed(interest.getName(), "Unknown failure");
};
- m_face.expressInterest(*interest, dataReceived, dataFailed);
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
}
void
-TorrentManager::download_data_packet(const Name& packetName,
- DataReceivedCallback onSuccess,
- FailedCallback onFailed)
+TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
- if (this->dataAlreadyDownloaded(packetName)) {
- onSuccess(packetName);
- return;
+ // handle if it is a torrent-file
+ const auto& interestName = interest.getName();
+ std::shared_ptr<Data> data = nullptr;
+ auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
+
+ // determine if it is torrent file (that we have)
+ auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
+ if (m_torrentSegments.end() != torrent_it) {
+ data = std::make_shared<Data>(*torrent_it);
}
-
- shared_ptr<Interest> interest = this->createInterest(packetName);
-
- auto dataReceived = [onSuccess, onFailed, this]
- (const Interest& interest, const Data& data) {
- // Write data to disk...
- writeData(data);
-
- // Stats Table update here...
- m_stats_table_iter->incrementReceivedData();
- m_retries = 0;
- onSuccess(data.getName());
- };
- auto dataFailed = [onFailed, this]
- (const Interest& interest) {
- m_retries++;
- if (m_retries >= MAX_NUM_OF_RETRIES) {
- m_stats_table_iter++;
- if (m_stats_table_iter == m_statsTable.end())
- m_stats_table_iter = m_statsTable.begin();
+ else {
+ // determine if it is manifest (that we have)
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
+ if (m_fileManifests.end() != manifest_it) {
+ data = std::make_shared<Data>(*manifest_it) ;
}
- onFailed(interest.getName(), "Unknown failure");
- };
-
- m_face.expressInterest(*interest, dataReceived, dataFailed);
+ else {
+ // determine if it is data packet (that we have)
+ auto manifestName = interestName.getSubName(0, interestName.size() - 2);
+ auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
+ [&manifestName](const std::pair<Name,
+ std::pair<std::shared_ptr<fs::fstream>,
+ std::vector<bool>>>& kv){
+ return manifestName.isPrefixOf(kv.first);
+ });
+ if (m_fileStates.end() != map_it) {
+ auto packetName = interestName.getSubName(0, interestName.size() - 1);
+ // get out the bitmap to be sure we have the packet
+ auto& fileState = map_it->second;
+ const auto &bitmap = fileState.second;
+ auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
+ if (bitmap[packetNum]) {
+ // get the manifest
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
+ [&manifestName](const FileManifest& m) {
+ return manifestName.isPrefixOf(m.name());
+ });
+ auto manifestFileName = manifest_it->file_name();
+ auto filePath = m_dataPath + manifestFileName;
+ // TODO(msweatt) Explore why fileState stream does not work
+ fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
+ data = readDataPacket(interestName,
+ *manifest_it,
+ m_subManifestSizes[manifestFileName],
+ is);
+ }
+ }
+ }
+ }
+ if (nullptr != data) {
+ m_face->put(*data);
+ }
+ else {
+ // TODO(msweatt) NACK
+ std::cerr << "NACK: " << interest << std::endl;
+ }
+ return;
}
-shared_ptr<Interest>
-TorrentManager::createInterest(Name name)
+void
+TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
{
- shared_ptr<Interest> interest = make_shared<Interest>(name);
- interest->setInterestLifetime(time::milliseconds(2000));
- interest->setMustBeFresh(true);
-
- // Select routable prefix
- Link link(name, { {1, m_stats_table_iter->getRecordName()} });
- m_keyChain->sign(link, signingWithSha256());
- Block linkWire = link.wireEncode();
-
- // Stats Table update here...
- m_stats_table_iter->incrementSentInterests();
-
- m_sortingCounter++;
- if (m_sortingCounter >= SORTING_INTERVAL) {
- m_sortingCounter = 0;
- m_statsTable.sort();
- m_stats_table_iter = m_statsTable.begin();
- m_retries = 0;
- }
-
- interest->setLink(linkWire);
-
- return interest;
+ std::cerr << "ERROR: Failed to register prefix \""
+ << prefix << "\" in local hub's daemon (" << reason << ")"
+ << std::endl;
+ m_face->shutdown();
}
shared_ptr<Name>
@@ -701,6 +815,24 @@
}
}
+void
+TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
+{
+ std::vector<Name> manifests;
+ // insert the first segment name of all the file manifests to the vector
+ for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
+ manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
+ }
+ // for each file
+ for (const auto& manifestName : manifests) {
+ // find the first (if any) segment we are missing
+ shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
+ if (nullptr != manifestSegmentName) {
+ manifestNames.push_back(*manifestSegmentName);
+ }
+ }
+}
+
bool
TorrentManager::dataAlreadyDownloaded(const Name& dataName)
{
@@ -726,24 +858,6 @@
}
void
-TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
-{
- std::vector<Name> manifests;
- // insert the first segment name of all the file manifests to the vector
- for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
- manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
- }
- // for each file
- for (const auto& manifestName : manifests) {
- // find the first (if any) segment we are missing
- shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
- if (nullptr != manifestSegmentName) {
- manifestNames.push_back(*manifestSegmentName);
- }
- }
-}
-
-void
TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
{
auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
@@ -781,5 +895,33 @@
}
}
+shared_ptr<Interest>
+TorrentManager::createInterest(Name name)
+{
+ shared_ptr<Interest> interest = make_shared<Interest>(name);
+ interest->setInterestLifetime(time::milliseconds(2000));
+ interest->setMustBeFresh(true);
+
+ // Select routable prefix
+ Link link(name, { {1, m_stats_table_iter->getRecordName()} });
+ m_keyChain->sign(link, signingWithSha256());
+ Block linkWire = link.wireEncode();
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementSentInterests();
+
+ m_sortingCounter++;
+ if (m_sortingCounter >= SORTING_INTERVAL) {
+ m_sortingCounter = 0;
+ m_statsTable.sort();
+ m_stats_table_iter = m_statsTable.begin();
+ m_retries = 0;
+ }
+
+ interest->setLink(linkWire);
+
+ return interest;
+}
+
} // end ntorrent
-} // end ndn
+} // end ndn
\ No newline at end of file
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index 28053f2..997f322 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -66,20 +66,9 @@
* The behavior is undefined unless Initialize() is called before calling any other method on a
* TorrentManger object.
*/
- TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath,
- ndn::Face& face);
-
- /*
- * \brief Create a new Torrent manager with the specified parameters.
- * @param torrentFileName The full name of the initial segment of the torrent file
- * @param dataPath The path to the location on disk to use for the torrent data
- *
- * The behavior is undefined unless Initialize() is called before calling any other method on a
- * TorrentManger object.
- */
- TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath);
+ TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ std::shared_ptr<Face> face = nullptr);
/*
* @brief Initialize the state of this object.
@@ -156,7 +145,7 @@
// Seed the specified 'data' to the network.
void
- seed(const Data& data) const;
+ seed(const Data& data);
protected:
/*
@@ -240,11 +229,21 @@
SORTING_INTERVAL = 100
};
+ void onDataReceived(const Data& data);
+
+ void
+ onInterestReceived(const InterestFilter& filter, const Interest& interest);
+
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+
// A map from each fileManifest to corresponding file stream on disk and a bitmap of which Data
// packets this manager currently has
mutable std::unordered_map<Name,
std::pair<std::shared_ptr<fs::fstream>,
std::vector<bool>>> m_fileStates;
+ // A map for each initial manifest to the size for the sub-manifest
+ std::unordered_map<std::string, size_t> m_subManifestSizes;
// The segments of the TorrentFile this manager has
std::vector<TorrentFile> m_torrentSegments;
// The FileManifests this manager has
@@ -322,7 +321,7 @@
// Stats table where routable prefixes are stored
StatsTable m_statsTable;
// Face used for network communication
- Face& m_face;
+ std::shared_ptr<Face> m_face;
// Iterator to the routable prefix that we currently use
StatsTable::iterator m_stats_table_iter;
// Number of retries per routable prefix
@@ -336,7 +335,7 @@
inline
TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
const std::string& dataPath,
- ndn::Face& face)
+ std::shared_ptr<Face> face)
: m_fileStates()
, m_torrentSegments()
, m_fileManifests()
@@ -347,26 +346,9 @@
, m_sortingCounter(0)
, m_keyChain(new KeyChain())
{
- // Hardcoded prefixes for now
- // TODO(Spyros): Think of something more clever to bootstrap...
- m_statsTable.insert("/ucla");
- m_statsTable.insert("/arizona");
- m_stats_table_iter = m_statsTable.begin();
-}
-
-inline
-TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath)
-: m_fileStates()
-, m_torrentSegments()
-, m_fileManifests()
-, m_torrentFileName(torrentFileName)
-, m_dataPath(dataPath)
-, m_face(*(new ndn::Face()))
-, m_retries(0)
-, m_sortingCounter(0)
-, m_keyChain(new KeyChain())
-{
+ if(face == nullptr) {
+ face = make_shared<Face>();
+ }
// Hardcoded prefixes for now
// TODO(Spyros): Think of something more clever to bootstrap...
m_statsTable.insert("/ucla");
diff --git a/tests/unit-tests/torrent-manager.t.cpp b/tests/unit-tests/torrent-manager.t.cpp
index 29938eb..eba4ae4 100644
--- a/tests/unit-tests/torrent-manager.t.cpp
+++ b/tests/unit-tests/torrent-manager.t.cpp
@@ -50,9 +50,9 @@
{
}
- TestTorrentManager(const ndn::Name& torrentFileName,
- const std::string& filePath,
- DummyClientFace& face)
+ TestTorrentManager(const ndn::Name& torrentFileName,
+ const std::string& filePath,
+ std::shared_ptr<DummyClientFace> face)
: TorrentManager(torrentFileName, filePath, face)
{
}
@@ -118,12 +118,17 @@
public:
explicit
FaceFixture(bool enableRegistrationReply = true)
- : face(io, { true, enableRegistrationReply })
+ : face(new DummyClientFace(io, { true, enableRegistrationReply }))
{
}
+ ~FaceFixture()
+ {
+ fs::remove_all(".appdata");
+ }
+
public:
- DummyClientFace face;
+ std::shared_ptr<DummyClientFace> face;
};
class FacesNoRegistrationReplyFixture : public FaceFixture
@@ -133,64 +138,85 @@
: FaceFixture(false)
{
}
+
};
-BOOST_AUTO_TEST_SUITE(TestTorrentManagerInitialize)
+BOOST_FIXTURE_TEST_SUITE(TestTorrentManagerInitialize, FaceFixture)
BOOST_AUTO_TEST_CASE(CheckInitializeComplete)
{
- vector<FileManifest> manifests;
- vector<TorrentFile> torrentSegments;
- std::string filePath = "tests/testdata/";
- // get torrent files and manifests
- {
- auto temp = TorrentFile::generate("tests/testdata/foo",
- 1024,
- 1024,
- 1024,
- false);
- torrentSegments = temp.first;
- auto temp1 = temp.second;
- for (const auto& ms : temp1) {
- manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
- }
- }
- // write the torrent segments and manifests to disk
- std::string dirPath = ".appdata/foo/";
- boost::filesystem::create_directories(dirPath);
- std::string torrentPath = dirPath + "torrent_files/";
- boost::filesystem::create_directory(torrentPath);
- auto fileNum = 0;
- for (const auto& t : torrentSegments) {
- fileNum++;
- auto filename = torrentPath + to_string(fileNum);
- io::save(t, filename);
- }
- //fileNum = 0;
- auto manifestPath = dirPath + "manifests/";
- boost::filesystem::create_directory(manifestPath);
- for (const auto& m : manifests) {
- fs::path filename = manifestPath + m.file_name() + "/" + to_string(m.submanifest_number());
- boost::filesystem::create_directories(filename.parent_path());
- io::save(m, filename.string());
- }
- // Initialize and verify
- TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253",
- filePath);
- manager.Initialize();
+ const struct {
+ const char *d_directoryPath;
+ const char *d_initialSegmentName;
+ size_t d_namesPerSegment;
+ size_t d_subManifestSize;
+ size_t d_dataPacketSize;
+ } DATA [] = {
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253", 1024, 1024, 1024},
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=b88c054e87bcbb744726f7eaf79f95459b4fddce2caeb952f263a5ccbbfc9a7c", 128, 128, 128},
+ // {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=76df604f23bdf257d16de588f2941df261951552a5f4435a315f59c3b018a851", 1, 1, 128},
+ };
+ enum { NUM_DATA = sizeof DATA / sizeof *DATA };
+ for (int i = 0; i < NUM_DATA; ++i) {
+ auto directoryPath = DATA[i].d_directoryPath;
+ Name initialSegmentName = DATA[i].d_initialSegmentName;
+ auto namesPerSegment = DATA[i].d_namesPerSegment;
+ auto dataPacketSize = DATA[i].d_dataPacketSize;
+ auto subManifestSize = DATA[i].d_subManifestSize;
- // Check that the torrent segments and file manifests match (content and order)
- BOOST_CHECK(manager.torrentSegments() == torrentSegments);
- BOOST_CHECK(manager.fileManifests() == manifests);
- // next check the data packet state vectors
- for (auto m : manager.fileManifests()) {
- auto fileState = manager.fileState(m.getFullName());
- BOOST_CHECK(fileState.size() == m.catalog().size());
- for (auto s : fileState) {
- BOOST_CHECK(s);
+ vector<FileManifest> manifests;
+ vector<TorrentFile> torrentSegments;
+ std::string filePath = "tests/testdata/";
+ // get torrent files and manifests
+ {
+ auto temp = TorrentFile::generate(directoryPath,
+ namesPerSegment,
+ subManifestSize,
+ dataPacketSize,
+ false);
+ torrentSegments = temp.first;
+ auto temp1 = temp.second;
+ for (const auto& ms : temp1) {
+ manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
+ }
}
+ // write the torrent segments and manifests to disk
+ std::string dirPath = ".appdata/foo/";
+ boost::filesystem::create_directories(dirPath);
+ std::string torrentPath = dirPath + "torrent_files/";
+ boost::filesystem::create_directory(torrentPath);
+ auto fileNum = 0;
+ for (const auto& t : torrentSegments) {
+ fileNum++;
+ auto filename = torrentPath + to_string(fileNum);
+ io::save(t, filename);
+ }
+ auto manifestPath = dirPath + "manifests/";
+ boost::filesystem::create_directory(manifestPath);
+ for (const auto& m : manifests) {
+ fs::path filename = manifestPath + m.file_name() + "/" + to_string(m.submanifest_number());
+ boost::filesystem::create_directories(filename.parent_path());
+ io::save(m, filename.string());
+ }
+ // Initialize and verify
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
+ manager.Initialize();
+
+ // Check that the torrent segments and file manifests match (content and order)
+ BOOST_CHECK(manager.torrentSegments() == torrentSegments);
+ BOOST_CHECK(manager.fileManifests() == manifests);
+ // next check the data packet state vectors
+ for (auto m : manager.fileManifests()) {
+ auto fileState = manager.fileState(m.getFullName());
+ BOOST_CHECK(fileState.size() == m.catalog().size());
+ for (auto s : fileState) {
+ BOOST_CHECK(s);
+ }
+ }
+ fs::remove_all(dirPath);
}
- fs::remove_all(dirPath);
}
BOOST_AUTO_TEST_CASE(CheckInitializeEmpty)
@@ -204,96 +230,141 @@
BOOST_AUTO_TEST_CASE(CheckInitializeNoManifests)
{
- vector<TorrentFile> torrentSegments;
- std::string filePath = "tests/testdata/";
- // get torrent files and manifests
- {
- auto temp = TorrentFile::generate("tests/testdata/foo",
- 1024,
- 1024,
- 1024,
- false);
- torrentSegments = temp.first;
- }
- // write the torrent segments but no manifests to disk
- std::string dirPath = ".appdata/foo/";
- boost::filesystem::create_directories(dirPath);
- std::string torrentPath = dirPath + "torrent_files/";
- boost::filesystem::create_directory(torrentPath);
- auto fileNum = 0;
- for (const auto& t : torrentSegments) {
- fileNum++;
- auto filename = torrentPath + to_string(fileNum);
- io::save(t, filename);
- }
- // Initialize and verify
- TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253",
- filePath);
- manager.Initialize();
+ const struct {
+ const char *d_directoryPath;
+ const char *d_initialSegmentName;
+ size_t d_namesPerSegment;
+ size_t d_subManifestSize;
+ size_t d_dataPacketSize;
+ } DATA [] = {
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253", 1024, 1024, 1024},
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=b88c054e87bcbb744726f7eaf79f95459b4fddce2caeb952f263a5ccbbfc9a7c", 128, 128, 128},
+ // {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=76df604f23bdf257d16de588f2941df261951552a5f4435a315f59c3b018a851", 1, 1, 128},
+ };
+ enum { NUM_DATA = sizeof DATA / sizeof *DATA };
+ for (int i = 0; i < NUM_DATA; ++i) {
+ auto directoryPath = DATA[i].d_directoryPath;
+ Name initialSegmentName = DATA[i].d_initialSegmentName;
+ auto namesPerSegment = DATA[i].d_namesPerSegment;
+ auto dataPacketSize = DATA[i].d_dataPacketSize;
+ auto subManifestSize = DATA[i].d_subManifestSize;
- // Check that the torrent segments and file manifests match (content and order)
- BOOST_CHECK(manager.torrentSegments() == torrentSegments);
- BOOST_CHECK(manager.fileManifests() == vector<FileManifest>());
+ vector<FileManifest> manifests;
+ vector<TorrentFile> torrentSegments;
+ std::string filePath = "tests/testdata/";
+ // get torrent files and manifests
+ {
+ auto temp = TorrentFile::generate(directoryPath,
+ namesPerSegment,
+ subManifestSize,
+ dataPacketSize,
+ false);
+ torrentSegments = temp.first;
+ }
+ // write the torrent segments and manifests to disk
+ std::string dirPath = ".appdata/foo/";
+ boost::filesystem::create_directories(dirPath);
+ std::string torrentPath = dirPath + "torrent_files/";
+ boost::filesystem::create_directory(torrentPath);
+ auto fileNum = 0;
+ for (const auto& t : torrentSegments) {
+ fileNum++;
+ auto filename = torrentPath + to_string(fileNum);
+ io::save(t, filename);
+ }
+ // Initialize and verify
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
+ manager.Initialize();
- fs::remove_all(dirPath);
+ // Check that the torrent segments and file manifests match (content and order)
+ BOOST_CHECK(manager.torrentSegments() == torrentSegments);
+ BOOST_CHECK(manager.fileManifests() == vector<FileManifest>());
+
+ fs::remove_all(".appdata");
+ }
}
BOOST_AUTO_TEST_CASE(CheckInitializeMissingManifests)
{
- vector<FileManifest> manifests;
- vector<TorrentFile> torrentSegments;
- std::string filePath = "tests/testdata/";
- // get torrent files and manifests
- {
- auto temp = TorrentFile::generate("tests/testdata/foo",
- 1024,
- 1024,
- 1024,
- false);
- torrentSegments = temp.first;
- auto temp1 = temp.second;
- temp1.pop_back(); // remove the manifests for the last file
- for (const auto& ms : temp1) {
- manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
- }
- }
- // write the torrent segments and manifests to disk
- std::string dirPath = ".appdata/foo/";
- boost::filesystem::create_directories(dirPath);
- std::string torrentPath = dirPath + "torrent_files/";
- boost::filesystem::create_directories(torrentPath);
- auto fileNum = 0;
- for (const auto& t : torrentSegments) {
- fileNum++;
- auto filename = torrentPath + to_string(fileNum);
- io::save(t, filename);
- }
- auto manifestPath = dirPath + "manifests/";
- boost::filesystem::create_directory(manifestPath);
- for (const auto& m : manifests) {
- fs::path filename = manifestPath + m.file_name() + to_string(m.submanifest_number());
- boost::filesystem::create_directory(filename.parent_path());
- io::save(m, filename.string());
- }
- // Initialize and verify
- TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253",
- filePath);
- manager.Initialize();
+ const struct {
+ const char *d_directoryPath;
+ const char *d_initialSegmentName;
+ size_t d_namesPerSegment;
+ size_t d_subManifestSize;
+ size_t d_dataPacketSize;
+ } DATA [] = {
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253", 1024, 1024, 1024},
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=b88c054e87bcbb744726f7eaf79f95459b4fddce2caeb952f263a5ccbbfc9a7c", 128, 128, 128},
+ // {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=76df604f23bdf257d16de588f2941df261951552a5f4435a315f59c3b018a851", 1, 1, 128},
+ };
+ enum { NUM_DATA = sizeof DATA / sizeof *DATA };
+ for (int i = 0; i < NUM_DATA; ++i) {
+ auto directoryPath = DATA[i].d_directoryPath;
+ Name initialSegmentName = DATA[i].d_initialSegmentName;
+ auto namesPerSegment = DATA[i].d_namesPerSegment;
+ auto dataPacketSize = DATA[i].d_dataPacketSize;
+ auto subManifestSize = DATA[i].d_subManifestSize;
- // Check that the torrent segments and file manifests match (content and order)
- BOOST_CHECK(manager.torrentSegments() == torrentSegments);
- BOOST_CHECK(manager.fileManifests() == manifests);
- // next check the data packet state vectors
- for (auto m : manager.fileManifests()) {
- auto fileState = manager.fileState(m.getFullName());
- BOOST_CHECK(fileState.size() == m.catalog().size());
- for (auto s : fileState) {
- BOOST_CHECK(s);
+ vector<FileManifest> manifests;
+ vector<TorrentFile> torrentSegments;
+ std::string filePath = "tests/testdata/";
+ // get torrent files and manifests
+ {
+ auto temp = TorrentFile::generate(directoryPath,
+ namesPerSegment,
+ subManifestSize,
+ dataPacketSize,
+ false);
+ torrentSegments = temp.first;
+ auto temp1 = temp.second;
+ temp1.pop_back(); // remove the manifests for the last file
+ for (const auto& ms : temp1) {
+ manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
+ }
}
+ // write the torrent segments and manifests to disk
+ std::string dirPath = ".appdata/foo/";
+ boost::filesystem::create_directories(dirPath);
+ std::string torrentPath = dirPath + "torrent_files/";
+ boost::filesystem::create_directories(torrentPath);
+ auto fileNum = 0;
+ for (const auto& t : torrentSegments) {
+ fileNum++;
+ auto filename = torrentPath + to_string(fileNum);
+ io::save(t, filename);
+ }
+ auto manifestPath = dirPath + "manifests/";
+ boost::filesystem::create_directory(manifestPath);
+ for (const auto& m : manifests) {
+ fs::path filename = manifestPath + m.file_name() + to_string(m.submanifest_number());
+ boost::filesystem::create_directory(filename.parent_path());
+ io::save(m, filename.string());
+ }
+ // Initialize and verify
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
+ manager.Initialize();
+
+ // Check that the torrent segments and file manifests match (content and order)
+ BOOST_CHECK(manager.torrentSegments() == torrentSegments);
+ BOOST_CHECK(manager.fileManifests() == manifests);
+ // next check the data packet state vectors
+ for (auto m : manager.fileManifests()) {
+ auto fileState = manager.fileState(m.getFullName());
+ BOOST_CHECK(fileState.size() == m.catalog().size());
+ for (auto s : fileState) {
+ BOOST_CHECK(s);
+ }
+ }
+ fs::remove_all(".appdata");
}
- fs::remove_all(dirPath);
}
+BOOST_AUTO_TEST_SUITE_END()
+
BOOST_FIXTURE_TEST_SUITE(TestTorrentManagerNetworkingStuff, FaceFixture)
BOOST_AUTO_TEST_CASE(TestDownloadingTorrentFile)
@@ -337,7 +408,7 @@
for (auto i = torrentSegments.begin(); i != torrentSegments.end(); i++) {
advanceClocks(time::milliseconds(1), 40);
- face.receive(dynamic_cast<Data&>(*i));
+ face->receive(dynamic_cast<Data&>(*i));
}
fs::remove_all(filePath);
@@ -385,7 +456,7 @@
});
advanceClocks(time::milliseconds(1), 40);
- face.receive(dynamic_cast<Data&>(manifests[0]));
+ face->receive(dynamic_cast<Data&>(manifests[0]));
manager.download_file_manifest(manifests[1].getFullName(), filePath + "manifests",
[&counter, &manifests]
@@ -411,7 +482,7 @@
for (auto i = manifests.begin() + 1; i != manifests.end(); i++) {
advanceClocks(time::milliseconds(1), 40);
- face.receive(dynamic_cast<Data&>(*i));
+ face->receive(dynamic_cast<Data&>(*i));
}
fs::remove_all(filePath);
@@ -443,7 +514,7 @@
data->wireEncode();
advanceClocks(time::milliseconds(1), 40);
- face.receive(*data);
+ face->receive(*data);
// Fail to download data
manager.download_data_packet(dataName,
@@ -558,7 +629,8 @@
}
// Initialize manager
TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=a8a2e98cd943d895b8c4b12a208343bcf9344ce85a6376dc6f5754fe8f4a573e",
- filePath);
+ filePath,
+ face);
manager.Initialize();
// Set the file state
@@ -600,7 +672,6 @@
fs::remove_all(".appdata");
}
-
BOOST_AUTO_TEST_CASE(TestFindManifestSegmentToDownload1)
{
std::string filePath = ".appdata/foo/";
@@ -669,9 +740,6 @@
n10 = Name(n10.toUri() + "/sha256digest");
BOOST_CHECK_EQUAL(manager.findManifestSegmentToDownload(n10)->toUri(), n10.toUri());
-
- fs::remove_all(filePath);
- fs::remove_all(".appdata");
}
BOOST_AUTO_TEST_CASE(TestFindManifestSegmentToDownload2)
@@ -716,7 +784,8 @@
}
// Initialize manager
TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=a8a2e98cd943d895b8c4b12a208343bcf9344ce85a6376dc6f5754fe8f4a573e",
- filePath);
+ filePath,
+ face);
manager.Initialize();
// Set the file state
@@ -814,7 +883,8 @@
}
// Initialize manager
TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=a8a2e98cd943d895b8c4b12a208343bcf9344ce85a6376dc6f5754fe8f4a573e",
- filePath);
+ filePath,
+ face);
manager.Initialize();
// Set the file state
@@ -842,17 +912,151 @@
BOOST_CHECK(manager.dataAlreadyDownloaded(p2));
}
-BOOST_AUTO_TEST_SUITE_END()
+BOOST_AUTO_TEST_CASE(CheckSeedComplete)
+{
+ const struct {
+ const char *d_directoryPath;
+ const char *d_initialSegmentName;
+ size_t d_namesPerSegment;
+ size_t d_subManifestSize;
+ size_t d_dataPacketSize;
+ } DATA [] = {
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253", 1024, 1024, 1024},
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=b88c054e87bcbb744726f7eaf79f95459b4fddce2caeb952f263a5ccbbfc9a7c", 128, 128, 128},
+ // {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=76df604f23bdf257d16de588f2941df261951552a5f4435a315f59c3b018a851", 1, 1, 128},
+ };
+ enum { NUM_DATA = sizeof DATA / sizeof *DATA };
+ for (int i = 0; i < NUM_DATA; ++i) {
+ auto directoryPath = DATA[i].d_directoryPath;
+ Name initialSegmentName = DATA[i].d_initialSegmentName;
+ auto namesPerSegment = DATA[i].d_namesPerSegment;
+ auto dataPacketSize = DATA[i].d_dataPacketSize;
+ auto subManifestSize = DATA[i].d_subManifestSize;
-BOOST_AUTO_TEST_SUITE(CheckTorrentManagerUtilities)
+ vector<FileManifest> manifests;
+ vector<TorrentFile> torrentSegments;
+ std::string filePath = "tests/testdata/";
+ std::vector<vector<Data>> fileData;
+ // get torrent files and manifests
+ {
+ auto temp = TorrentFile::generate(directoryPath,
+ namesPerSegment,
+ subManifestSize,
+ dataPacketSize,
+ false);
+ torrentSegments = temp.first;
+ auto temp1 = temp.second;
+ for (const auto& ms : temp1) {
+ manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
+ fileData.push_back(ms.second);
+ }
+ }
+ // write the torrent segments and manifests to disk
+ std::string dirPath = ".appdata/foo/";
+ boost::filesystem::create_directories(dirPath);
+ std::string torrentPath = dirPath + "torrent_files/";
+ boost::filesystem::create_directory(torrentPath);
+ auto fileNum = 0;
+ for (const auto& t : torrentSegments) {
+ fileNum++;
+ auto filename = torrentPath + to_string(fileNum);
+ io::save(t, filename);
+ }
+ auto manifestPath = dirPath + "manifests/";
+ boost::filesystem::create_directory(manifestPath);
+ for (const auto& m : manifests) {
+ fs::path filename = manifestPath + m.file_name() + "/" + to_string(m.submanifest_number());
+ boost::filesystem::create_directories(filename.parent_path());
+ io::save(m, filename.string());
+ }
+ // Initialize and verify
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
+ manager.Initialize();
+ size_t nData = 0;
+ BOOST_CHECK_EQUAL(0, face->sentData.size());
+ // request all the torrent segments
+ for (const auto& t : torrentSegments) {
+ Interest interest(t.getFullName(), time::milliseconds(50));
+ face->expressInterest(interest,
+ [&t](const Interest& i, const Data& d) {
+ TorrentFile t1(d.wireEncode());
+ BOOST_CHECK(t == d);
+ BOOST_CHECK(t1 == t);
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+ advanceClocks(time::milliseconds(1), 40);
+ face->receive(interest);
+ face->processEvents(time::milliseconds(-1));
+ // check that one piece of data is sent, and it is what was expected
+ BOOST_CHECK_EQUAL(++nData, face->sentData.size());
+ face->receive(face->sentData[nData - 1]);
+ }
+ // request all the file manifests
+ for (const auto& m : manifests) {
+ Interest interest(m.getFullName(), time::milliseconds(50));
+ face->expressInterest(interest,
+ [&m](const Interest& i, const Data& d) {
+ FileManifest m1(d.wireEncode());
+ BOOST_CHECK(m == d);
+ BOOST_CHECK(m1 == m);
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+ advanceClocks(time::milliseconds(1), 40);
+ face->receive(interest);
+ face->processEvents(time::milliseconds(-1));
+ // check that one piece of data is sent, and it is what was expected
+ BOOST_CHECK_EQUAL(++nData, face->sentData.size());
+ face->receive(face->sentData[nData - 1]);
+ }
+ // request all the data packets
+ for (const auto& file : fileData) {
+ for (const auto& data : file) {
+ Interest interest(data. getFullName(), time::milliseconds(50));
+ face->expressInterest(interest,
+ [&data](const Interest& i, const Data& d) {
+ BOOST_CHECK(data == d);
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+ advanceClocks(time::milliseconds(1), 40);
+ face->receive(interest);
+ face->processEvents(time::milliseconds(-1));
+ // check that one piece of data is sent, and it is what was expected
+ BOOST_CHECK_EQUAL(++nData, face->sentData.size());
+ face->receive(face->sentData[nData - 1]);
+ }
+ }
+ // clean up tests
+ face->sentData.clear();
+ fs::remove_all(".appdata/");
+ }
+}
-BOOST_AUTO_TEST_CASE(CheckWriteDataComplete)
+BOOST_AUTO_TEST_CASE(CheckSeedRandom)
{
vector<FileManifest> manifests;
vector<TorrentFile> torrentSegments;
// for each file, the data packets
- std::vector<vector<Data>> fileData;
- std::string filePath = "tests/testdata/temp";
+ std::vector<Data> data;
+ std::string filePath = "tests/testdata/";
+ std::string dirPath = ".appdata/foo/";
+ Name initialSegmentName = "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253";
// get torrent files and manifests
{
auto temp = TorrentFile::generate("tests/testdata/foo",
@@ -864,13 +1068,12 @@
auto temp1 = temp.second;
for (const auto& ms : temp1) {
manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
- fileData.push_back(ms.second);
+ data.insert(data.end(), ms.second.begin(), ms.second.end());
}
}
// write the torrent segments and manifests to disk
- std::string dirPath = ".appdata/foo/";
boost::filesystem::create_directories(dirPath);
- std::string torrentPath = dirPath + "torrent_files/";
+ auto torrentPath = dirPath + "torrent_files/";
boost::filesystem::create_directories(torrentPath);
auto fileNum = 0;
for (const auto& t : torrentSegments) {
@@ -878,7 +1081,6 @@
auto filename = torrentPath + to_string(fileNum);
io::save(t, filename);
}
- //fileNum = 0;
auto manifestPath = dirPath + "manifests/";
boost::filesystem::create_directory(manifestPath);
for (const auto& m : manifests) {
@@ -886,57 +1088,156 @@
boost::filesystem::create_directory(filename.parent_path());
io::save(m, filename.string());
}
+
// Initialize manager
- TestTorrentManager manager("/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253",
- filePath);
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
manager.Initialize();
- // check that initially there is no data on disk
- for (auto m : manager.fileManifests()) {
- auto fileState = manager.fileState(m.getFullName());
- BOOST_CHECK(fileState.empty());
+
+ // insert the other entities
+ data.insert(data.end(), torrentSegments.begin(), torrentSegments.end());
+ data.insert(data.end(), manifests.begin(), manifests.end());
+
+ std::random_shuffle(data.begin(), data.end());
+ // request all the data packets
+ auto nData = 0;
+ for(const auto& d : data) {
+ Interest interest(d.getFullName(), time::milliseconds(50));
+ face->expressInterest(interest,
+ [&d](const Interest& i, const Data& data) {
+ BOOST_CHECK(data == d);
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+ advanceClocks(time::milliseconds(1), 40);
+ face->receive(interest);
+ face->processEvents(time::milliseconds(-1));
+ // check that one piece of data is sent, and it is what was expected
+ BOOST_CHECK_EQUAL(++nData, face->sentData.size());
+ face->receive(face->sentData[nData - 1]);
}
- // write all data to disk (for each file manifest)
- auto manifest_it = manifests.begin();
- for (auto& data : fileData) {
- for (auto& d : data) {
- BOOST_CHECK(manager.writeData(d));
- }
- // check that the state is updated appropriately
- auto fileState = manager.fileState(manifest_it->getFullName());
- for (auto s : fileState) {
- BOOST_CHECK(s);
- }
- ++manifest_it;
- }
- // get the file names (ascending)
- std::set<std::string> fileNames;
- for (auto i = fs::recursive_directory_iterator(filePath + "/foo");
- i != fs::recursive_directory_iterator();
- ++i) {
- fileNames.insert(i->path().string());
- }
- // verify file by file that the data packets are written correctly
- auto f_it = fileData.begin();
- for (auto f : fileNames) {
- // read file from disk
- std::vector<uint8_t> file_bytes;
- fs::ifstream is(f, fs::ifstream::binary | fs::ifstream::in);
- is >> std::noskipws;
- std::istream_iterator<uint8_t> start(is), end;
- file_bytes.insert(file_bytes.end(), start, end);
- std::vector<uint8_t> data_bytes;
- // get content from data packets
- for (const auto& d : *f_it) {
- auto content = d.getContent();
- data_bytes.insert(data_bytes.end(), content.value_begin(), content.value_end());
- }
- BOOST_CHECK(data_bytes == file_bytes);
- ++f_it;
- }
- fs::remove_all(filePath);
fs::remove_all(".appdata");
}
+BOOST_AUTO_TEST_SUITE_END()
+
+BOOST_FIXTURE_TEST_SUITE(CheckTorrentManagerUtilities, FaceFixture)
+
+BOOST_AUTO_TEST_CASE(CheckWriteDataComplete)
+{
+ const struct {
+ const char *d_directoryPath;
+ const char *d_initialSegmentName;
+ size_t d_namesPerSegment;
+ size_t d_subManifestSize;
+ size_t d_dataPacketSize;
+ } DATA [] = {
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=02c737fd4c6e7de4b4825b089f39700c2dfa8fd2bb2b91f09201e357c4463253", 1024, 1024, 1024},
+ {"tests/testdata/foo", "/NTORRENT/foo/torrent-file/sha256digest=b88c054e87bcbb744726f7eaf79f95459b4fddce2caeb952f263a5ccbbfc9a7c", 128, 128, 128},
+ };
+ enum { NUM_DATA = sizeof DATA / sizeof *DATA };
+ for (int i = 0; i < NUM_DATA; ++i) {
+ auto directoryPath = DATA[i].d_directoryPath;
+ Name initialSegmentName = DATA[i].d_initialSegmentName;
+ auto namesPerSegment = DATA[i].d_namesPerSegment;
+ auto dataPacketSize = DATA[i].d_dataPacketSize;
+ auto subManifestSize = DATA[i].d_subManifestSize;
+
+ vector<TorrentFile> torrentSegments;
+ vector<FileManifest> manifests;
+ // for each file, the data packets
+ std::vector<vector<Data>> fileData;
+ std::string filePath = "tests/testdata/temp";
+ // get torrent files and manifests
+ {
+ auto temp = TorrentFile::generate(directoryPath,
+ namesPerSegment,
+ subManifestSize,
+ dataPacketSize,
+ false);
+ torrentSegments = temp.first;
+ auto temp1 = temp.second;
+ for (const auto& ms : temp1) {
+ manifests.insert(manifests.end(), ms.first.begin(), ms.first.end());
+ fileData.push_back(ms.second);
+ }
+ }
+ // write the torrent segments and manifests to disk
+ std::string dirPath = ".appdata/foo/";
+ boost::filesystem::create_directories(dirPath);
+ std::string torrentPath = dirPath + "torrent_files/";
+ boost::filesystem::create_directories(torrentPath);
+ auto fileNum = 0;
+ for (const auto& t : torrentSegments) {
+ fileNum++;
+ auto filename = torrentPath + to_string(fileNum);
+ io::save(t, filename);
+ }
+ auto manifestPath = dirPath + "manifests/";
+ boost::filesystem::create_directory(manifestPath);
+ for (const auto& m : manifests) {
+ fs::path filename = manifestPath + m.file_name() + to_string(m.submanifest_number());
+ boost::filesystem::create_directory(filename.parent_path());
+ io::save(m, filename.string());
+ }
+ // Initialize manager
+ TestTorrentManager manager(initialSegmentName,
+ filePath,
+ face);
+ manager.Initialize();
+ // check that initially there is no data on disk
+ for (auto m : manager.fileManifests()) {
+ auto fileState = manager.fileState(m.getFullName());
+ BOOST_CHECK(fileState.empty());
+ }
+ // write all data to disk (for each file manifest)
+ auto manifest_it = manifests.begin();
+ for (auto& data : fileData) {
+ for (auto& d : data) {
+ BOOST_CHECK(manager.writeData(d));
+ }
+ // check that the state is updated appropriately
+ auto fileState = manager.fileState(manifest_it->getFullName());
+ for (auto s : fileState) {
+ BOOST_CHECK(s);
+ }
+ ++manifest_it;
+ }
+ // get the file names (ascending)
+ std::set<std::string> fileNames;
+ for (auto i = fs::recursive_directory_iterator(filePath + "/foo");
+ i != fs::recursive_directory_iterator();
+ ++i) {
+ fileNames.insert(i->path().string());
+ }
+ // verify file by file that the data packets are written correctly
+ auto f_it = fileData.begin();
+ for (auto f : fileNames) {
+ // read file from disk
+ std::vector<uint8_t> file_bytes;
+ fs::ifstream is(f, fs::ifstream::binary | fs::ifstream::in);
+ is >> std::noskipws;
+ std::istream_iterator<uint8_t> start(is), end;
+ file_bytes.insert(file_bytes.end(), start, end);
+ std::vector<uint8_t> data_bytes;
+ // get content from data packets
+ for (const auto& d : *f_it) {
+ auto content = d.getContent();
+ data_bytes.insert(data_bytes.end(), content.value_begin(), content.value_end());
+ }
+ BOOST_CHECK(data_bytes == file_bytes);
+ ++f_it;
+ }
+ fs::remove_all(filePath);
+ fs::remove_all(".appdata");
+ }
+}
+
BOOST_AUTO_TEST_CASE(CheckWriteTorrentComplete)
{
const struct {
@@ -970,7 +1271,8 @@
}
// Initialize manager
TestTorrentManager manager(initialSegmentName,
- filePath);
+ filePath,
+ face);
manager.Initialize();
std::string dirPath = ".appdata/foo/";
std::string torrentPath = dirPath + "torrent_files/";
@@ -981,7 +1283,8 @@
BOOST_CHECK(manager.torrentSegments() == torrentSegments);
// check that initializing a new manager also gets all the torrent torrentSegments
TestTorrentManager manager2(initialSegmentName,
- filePath);
+ filePath,
+ face);
manager2.Initialize();
BOOST_CHECK(manager2.torrentSegments() == torrentSegments);
@@ -1045,7 +1348,8 @@
}
}
TestTorrentManager manager(initialSegmentName,
- filePath);
+ filePath,
+ face);
manager.Initialize();
for (const auto& t : torrentSegments) {
manager.writeTorrentSegment(t, torrentPath);
@@ -1058,7 +1362,8 @@
BOOST_CHECK(manager.fileManifests() == manifests);
TestTorrentManager manager2(initialSegmentName,
- filePath);
+ filePath,
+ face);
manager2.Initialize();
BOOST_CHECK(manager2.fileManifests() == manifests);
@@ -1083,8 +1388,6 @@
BOOST_AUTO_TEST_SUITE_END()
-BOOST_AUTO_TEST_SUITE_END()
-
} // namespace tests
} // namespace nTorrent
} // namespace ndn