Implemented TorrentManger::seed. Fixed bug in TorrentManger::Initialize of using incorrect
submanifest size. Migrate torrent-manager to use shared_ptr to face rather than a dangling
reference. Lastly, I added more tests throughout and reorganized the code to match the order of
declaration.
Change-Id: Ia427d8047ff6e69d5f274482d0d9a17e953e1f0d
diff --git a/src/torrent-file.cpp b/src/torrent-file.cpp
index d7918d2..30957bf 100644
--- a/src/torrent-file.cpp
+++ b/src/torrent-file.cpp
@@ -239,6 +239,7 @@
size_t dataPacketSize,
bool returnData)
{
+ //TODO(spyros) Adapt this support subdirectories in 'directoryPath'
BOOST_ASSERT(0 < namesPerSegment);
std::vector<TorrentFile> torrentSegments;
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 4a0eaff..155d7cf 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -127,13 +127,47 @@
return packets;
}
+// =================================================================================================
+// Torrent Manager Utility Functions
+// ==================================================================================================
+
+static std::shared_ptr<Data>
+readDataPacket(const Name& packetFullName,
+ const FileManifest& manifest,
+ size_t subManifestSize,
+ fs::fstream& is) {
+ auto dataPacketSize = manifest.data_packet_size();
+ auto start_offset = manifest.submanifest_number() * subManifestSize * dataPacketSize;
+ auto packetNum = packetFullName.get(packetFullName.size() - 2).toSequenceNumber();
+ // seek to packet
+ is.sync();
+ is.seekg(start_offset + packetNum * dataPacketSize);
+ if (is.tellg() < 0) {
+ std::cerr << "bad seek" << std::endl;
+ }
+ // read contents
+ std::vector<char> bytes(dataPacketSize);
+ is.read(&bytes.front(), dataPacketSize);
+ auto read_size = is.gcount();
+ if (is.bad() || read_size < 0) {
+ std::cerr << "Bad read" << std::endl;
+ return nullptr;
+ }
+ // construct packet
+ auto packetName = packetFullName.getSubName(0, packetFullName.size() - 1);
+ auto d = make_shared<Data>(packetName);
+ d->setContent(encoding::makeBinaryBlock(tlv::Content, &bytes.front(), read_size));
+ ndn::security::KeyChain key_chain;
+ key_chain.sign(*d, signingWithSha256());
+ return d->getFullName() == packetFullName ? d : nullptr;
+}
+
static vector<TorrentFile>
intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
{
security::KeyChain key_chain;
Name currSegmentFullName = initialSegmentName;
vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
-
// Starting with the initial segment name, verify the names, loading next name from torrentSegment
for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
TorrentFile& segment = *it;
@@ -153,85 +187,75 @@
}
static vector<FileManifest>
-intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
+intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
{
security::KeyChain key_chain;
vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
-
+ if (manifests.empty()) {
+ return manifests;
+ }
// sign the manifests
std::for_each(manifests.begin(), manifests.end(),
[&key_chain](FileManifest& m){
key_chain.sign(m,signingWithSha256());
});
- // put all names of manifests from the valid torrent files into a set
- std::set<ndn::Name> validManifestNames;
+ // put all names of initial manifests from the valid torrent files into a set
+ std::vector<ndn::Name> validInitialManifestNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
- validManifestNames.insert(catalog.begin(), catalog.end());
+ validInitialManifestNames.insert(validInitialManifestNames.end(),
+ catalog.begin(),
+ catalog.end());
}
+ auto manifest_it = manifests.begin();
+ std::vector<FileManifest> output;
+ output.reserve(manifests.size());
+ auto validIvalidInitialManifestNames_it = validInitialManifestNames.begin();
- // put all names of file manifests from disk into a set
- std::set<ndn::Name> loadedManifestNames;
- std::for_each(manifests.begin(), manifests.end(),
- [&loadedManifestNames](const FileManifest& m){
- loadedManifestNames.insert(m.getFullName());
- });
-
- // the set of fileManifests that we have is simply the intersection
- std::set<Name> output;
- std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
- loadedManifestNames.begin(), loadedManifestNames.end(),
- std::inserter(output, output.begin()));
-
- // filter out those manifests that are not in this set
- std::remove_if(manifests.begin(),
- manifests.end(),
- [&output](const FileManifest& m) {
- return (output.end() == output.find(m.name()));
- });
-
- // order the manifests in the same order they are in the torrent
- std::vector<Name> catalogNames;
- for (const auto& segment : torrentSegments) {
- const auto& catalog = segment.getCatalog();
- catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
- }
- size_t curr_index = 0;
- for (auto name : catalogNames) {
- auto it = std::find_if(manifests.begin(), manifests.end(),
- [&name](const FileManifest& m) {
- return m.getFullName() == name;
- });
- if (it != manifests.end()) {
- // not already in the correct position
- if (it != manifests.begin() + curr_index) {
- std::swap(manifests[curr_index], *it);
+ for (auto& initialName : validInitialManifestNames) {
+ // starting from the initial segment
+ auto& validName = initialName;
+ if (manifests.end() == manifest_it) {
+ break;
+ }
+ auto fileName = manifest_it->file_name();
+ // sequential collect all valid segments
+ while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
+ output.push_back(*manifest_it);
+ if (manifest_it->submanifest_ptr() != nullptr) {
+ validName = *manifest_it->submanifest_ptr();
+ ++manifest_it;
}
- ++curr_index;
+ else {
+ ++manifest_it;
+ break;
+ }
+ }
+ // skip the remain segments for this file (all invalid)
+ while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
+ ++manifest_it;
}
}
-
- return manifests;
+ return output;
}
static vector<Data>
-intializeDataPackets(const string& filePath,
- const FileManifest manifest,
- const TorrentFile& torrentFile)
+initializeDataPackets(const string& filePath,
+ const FileManifest manifest,
+ size_t subManifestSize)
{
vector<Data> packets;
auto subManifestNum = manifest.submanifest_number();
packets = packetize_file(filePath,
- manifest.name(),
- manifest.data_packet_size(),
- manifest.catalog().size(),
- subManifestNum);
+ manifest.name(),
+ manifest.data_packet_size(),
+ subManifestSize,
+ subManifestNum);
auto catalog = manifest.catalog();
-
// Filter out invalid packet names
std::remove_if(packets.begin(), packets.end(),
[&packets, &catalog](const Data& p) {
@@ -243,23 +267,36 @@
}
static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
-initializeFileState(const string& dataPath,
- const FileManifest& manifest)
+initializeFileState(const string& dataPath,
+ const FileManifest& manifest,
+ size_t subManifestSize)
{
// construct the file name
auto fileName = manifest.file_name();
auto filePath = dataPath + fileName;
vector<bool> fileBitMap(manifest.catalog().size());
- auto fbits = fs::fstream::out | fs::fstream::binary;
- // if file exists, use in O/W use concatenate mode
- fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
- auto s = std::make_shared<fs::fstream>(filePath, fbits);
- if (!*s) {
- BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
+ // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
+ if (!fs::exists(filePath)) {
+ fs::ofstream fs(filePath);
+ fs << "";
}
+ auto s = std::make_shared<fs::fstream>(filePath,
+ fs::fstream::out
+ | fs::fstream::binary
+ | fs::fstream::in);
+ if (!*s) {
+ BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
+ }
+ auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
+ s->seekg(start_offset);
+ s->seekp(start_offset);
return std::make_pair(s, fileBitMap);
}
+//==================================================================================================
+// TorrentManager Implementation
+//==================================================================================================
+
void TorrentManager::Initialize()
{
// .../<torrent_name>/torrent-file/<implicit_digest>
@@ -276,30 +313,38 @@
return;
}
m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
- auto currTorrentFile_it = m_torrentSegments.begin();
+
+ // get the submanifest sizes
for (const auto& m : m_fileManifests) {
- // find the appropriate torrent file
- auto currCatalog = currTorrentFile_it->getCatalog();
- while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
- {
- ++currTorrentFile_it;
- currCatalog = currTorrentFile_it->getCatalog();
+ if (m.submanifest_number() == 0) {
+ auto manifestFileName = m.file_name();
+ m_subManifestSizes[manifestFileName] = m.catalog().size();
}
+ }
+
+ for (const auto& m : m_fileManifests) {
// construct the file name
auto fileName = m.file_name();
fs::path filePath = m_dataPath + fileName;
// If there are any valid packets, add corresponding state to manager
if (!fs::exists(filePath)) {
- boost::filesystem::create_directories(filePath.parent_path());
+ if (!fs::exists(filePath.parent_path())) {
+ boost::filesystem::create_directories(filePath.parent_path());
+ }
continue;
}
- auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
+ auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
if (!packets.empty()) {
- m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
+ m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
+ m,
+ m_subManifestSizes[m.file_name()]);
auto& fileBitMap = m_fileStates[m.getFullName()].second;
auto read_it = packets.begin();
size_t i = 0;
for (auto name : m.catalog()) {
+ if (read_it == packets.end()) {
+ break;
+ }
if (name == read_it->getFullName()) {
++read_it;
fileBitMap[i] = true;
@@ -315,10 +360,184 @@
seed(t);
}
for (const auto& m : m_fileManifests) {
- seed(m);
+ seed(m);
}
}
+std::vector<Name>
+TorrentManager::downloadTorrentFile(const std::string& path)
+{
+ shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
+ auto manifestNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findFileManifestsToDownload(*manifestNames);
+ if (manifestNames->empty()) {
+ auto packetNames = make_shared<std::vector<Name>>();
+ this->findAllMissingDataPackets(*packetNames);
+ return *packetNames;
+ }
+ else {
+ return *manifestNames;
+ }
+ }
+ this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
+ false, {}, {});
+ return *manifestNames;
+}
+
+void
+TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> manifestNames,
+ bool async,
+ TorrentFileReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Interest> interest = createInterest(name);
+
+ auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+
+ if (async) {
+ manifestNames->clear();
+ }
+
+ TorrentFile file(data.wireEncode());
+
+ // Write the torrent file segment to disk...
+ if (writeTorrentSegment(file, path)) {
+ // if successfully written, seed this data
+ seed(data);
+ }
+
+ const std::vector<Name>& manifestCatalog = file.getCatalog();
+ manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
+
+ shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
+
+ if (async) {
+ onSuccess(*manifestNames);
+ }
+ if (nextSegmentPtr != nullptr) {
+ this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
+ async, onSuccess, onFailed);
+ }
+ };
+
+ auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
+ (const Interest& interest) {
+ ++m_retries;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ ++m_stats_table_iter;
+ if (m_stats_table_iter == m_statsTable.end()) {
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ }
+ if (async) {
+ onFailed(interest.getName(), "Unknown error");
+ }
+ this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
+ };
+
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
+
+ if (!async) {
+ m_face->processEvents();
+ }
+}
+
+void
+TorrentManager::downloadTorrentFile(const std::string& path,
+ TorrentFileReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
+ auto manifestNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findFileManifestsToDownload(*manifestNames);
+ if (manifestNames->empty()) {
+ auto packetNames = make_shared<std::vector<Name>>();
+ this->findAllMissingDataPackets(*packetNames);
+ onSuccess(*packetNames);
+ return;
+ }
+ else {
+ onSuccess(*manifestNames);
+ return;
+ }
+ }
+ this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
+ true, onSuccess, onFailed);
+}
+
+void
+TorrentManager::download_file_manifest(const Name& manifestName,
+ const std::string& path,
+ TorrentManager::ManifestReceivedCallback onSuccess,
+ TorrentManager::FailedCallback onFailed)
+{
+ shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
+ auto packetNames = make_shared<std::vector<Name>>();
+ if (searchRes == nullptr) {
+ this->findDataPacketsToDownload(manifestName, *packetNames);
+ onSuccess(*packetNames);
+ return;
+ }
+ this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
+}
+
+void
+TorrentManager::download_data_packet(const Name& packetName,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ if (this->dataAlreadyDownloaded(packetName)) {
+ onSuccess(packetName);
+ return;
+ }
+
+ shared_ptr<Interest> interest = this->createInterest(packetName);
+
+ auto dataReceived = [onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Write data to disk...
+ if(writeData(data)) {
+ seed(data);
+ }
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+ onSuccess(data.getName());
+ };
+ auto dataFailed = [onFailed, this]
+ (const Interest& interest) {
+ m_retries++;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ m_stats_table_iter++;
+ if (m_stats_table_iter == m_statsTable.end())
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ onFailed(interest.getName(), "Unknown failure");
+ };
+
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
+}
+
+void TorrentManager::seed(const Data& data) {
+ m_face->setInterestFilter(data.getFullName(),
+ bind(&TorrentManager::onInterestReceived, this, _1, _2),
+ RegisterPrefixSuccessCallback(),
+ bind(&TorrentManager::onRegisterFailed, this, _1, _2));
+}
+
+// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
+// Protected Helpers
+// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
+
bool TorrentManager::writeData(const Data& packet)
{
// find correct manifest
@@ -332,18 +551,28 @@
}
// get file state out
auto& fileState = m_fileStates[manifest_it->getFullName()];
+
// if there is no open stream to the file
if (nullptr == fileState.first) {
- fileState = initializeFileState(m_dataPath, *manifest_it);
+ fs::path filePath = m_dataPath + manifest_it->file_name();
+ if (!fs::exists(filePath)) {
+ fs::create_directories(filePath.parent_path());
+ }
+
+ fileState = initializeFileState(m_dataPath,
+ *manifest_it,
+ m_subManifestSizes[manifest_it->file_name()]);
}
auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
// if we already have the packet, do not rewrite it.
if (fileState.second[packetNum]) {
return false;
}
- auto packetOffset = packetNum * manifest_it->data_packet_size();
+ auto dataPacketSize = manifest_it->data_packet_size();
+ auto initial_offset = manifest_it->submanifest_number() * m_subManifestSizes[manifest_it->file_name()] * dataPacketSize;
+ auto packetOffset = initial_offset + packetNum * dataPacketSize;
// write data to disk
- fileState.first->seekg(packetOffset);
+ fileState.first->seekp(packetOffset);
try {
auto content = packet.getContent();
std::vector<char> data(content.value_begin(), content.value_end());
@@ -419,144 +648,20 @@
}
io::save(manifest, filename.string());
// add to collection
- // add to collection
auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
[&manifest](const FileManifest& m){
return m.file_name() > manifest.file_name()
|| (m.file_name() == manifest.file_name()
&& (m.submanifest_number() > manifest.submanifest_number()));
});
+ // update the state of the manager
m_fileManifests.insert(it, manifest);
+ if (0 == manifest.submanifest_number()) {
+ m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
+ }
return true;
}
-void TorrentManager::seed(const Data& data) const {
- // TODO(msweatt) IMPLEMENT ME
-}
-
-void
-TorrentManager::downloadTorrentFile(const std::string& path,
- TorrentFileReceivedCallback onSuccess,
- FailedCallback onFailed)
-{
- shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
- auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- onSuccess(*packetNames);
- return;
- }
- else {
- onSuccess(*manifestNames);
- return;
- }
- }
- this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
- true, onSuccess, onFailed);
-}
-
-std::vector<Name>
-TorrentManager::downloadTorrentFile(const std::string& path)
-{
- shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
- auto manifestNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findFileManifestsToDownload(*manifestNames);
- if (manifestNames->empty()) {
- auto packetNames = make_shared<std::vector<Name>>();
- this->findAllMissingDataPackets(*packetNames);
- return *packetNames;
- }
- else {
- return *manifestNames;
- }
- }
- this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
- false, {}, {});
- return *manifestNames;
-}
-
-void
-TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
- const std::string& path,
- std::shared_ptr<std::vector<Name>> manifestNames,
- bool async,
- TorrentFileReceivedCallback onSuccess,
- FailedCallback onFailed)
-{
- shared_ptr<Interest> interest = createInterest(name);
-
- auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
- (const Interest& interest, const Data& data) {
- // Stats Table update here...
- m_stats_table_iter->incrementReceivedData();
- m_retries = 0;
-
- if (async) {
- manifestNames->clear();
- }
-
- TorrentFile file(data.wireEncode());
-
- // Write the torrent file segment to disk...
- writeTorrentSegment(file, path);
-
- const std::vector<Name>& manifestCatalog = file.getCatalog();
- manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
-
- shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
-
- if (async) {
- onSuccess(*manifestNames);
- }
- if (nextSegmentPtr != nullptr) {
- this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
- async, onSuccess, onFailed);
- }
- };
-
- auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
- (const Interest& interest) {
- ++m_retries;
- if (m_retries >= MAX_NUM_OF_RETRIES) {
- ++m_stats_table_iter;
- if (m_stats_table_iter == m_statsTable.end()) {
- m_stats_table_iter = m_statsTable.begin();
- }
- }
- if (async) {
- onFailed(interest.getName(), "Unknown error");
- }
- this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
- };
-
- m_face.expressInterest(*interest, dataReceived, dataFailed);
-
- if (!async) {
- m_face.processEvents();
- }
-}
-
-
-void
-TorrentManager::download_file_manifest(const Name& manifestName,
- const std::string& path,
- TorrentManager::ManifestReceivedCallback onSuccess,
- TorrentManager::FailedCallback onFailed)
-{
- shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
- auto packetNames = make_shared<std::vector<Name>>();
- if (searchRes == nullptr) {
- this->findDataPacketsToDownload(manifestName, *packetNames);
- onSuccess(*packetNames);
- return;
- }
- this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
-}
-
void
TorrentManager::downloadFileManifestSegment(const Name& manifestName,
const std::string& path,
@@ -575,7 +680,9 @@
FileManifest file(data.wireEncode());
// Write the file manifest segment to disk...
- writeFileManifest(file, path);
+ if( writeFileManifest(file, path)) {
+ seed(file);
+ }
const std::vector<Name>& packetsCatalog = file.catalog();
packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
@@ -598,71 +705,78 @@
onFailed(interest.getName(), "Unknown failure");
};
- m_face.expressInterest(*interest, dataReceived, dataFailed);
+ m_face->expressInterest(*interest, dataReceived, dataFailed);
}
void
-TorrentManager::download_data_packet(const Name& packetName,
- DataReceivedCallback onSuccess,
- FailedCallback onFailed)
+TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
{
- if (this->dataAlreadyDownloaded(packetName)) {
- onSuccess(packetName);
- return;
+ // handle if it is a torrent-file
+ const auto& interestName = interest.getName();
+ std::shared_ptr<Data> data = nullptr;
+ auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
+
+ // determine if it is torrent file (that we have)
+ auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
+ if (m_torrentSegments.end() != torrent_it) {
+ data = std::make_shared<Data>(*torrent_it);
}
-
- shared_ptr<Interest> interest = this->createInterest(packetName);
-
- auto dataReceived = [onSuccess, onFailed, this]
- (const Interest& interest, const Data& data) {
- // Write data to disk...
- writeData(data);
-
- // Stats Table update here...
- m_stats_table_iter->incrementReceivedData();
- m_retries = 0;
- onSuccess(data.getName());
- };
- auto dataFailed = [onFailed, this]
- (const Interest& interest) {
- m_retries++;
- if (m_retries >= MAX_NUM_OF_RETRIES) {
- m_stats_table_iter++;
- if (m_stats_table_iter == m_statsTable.end())
- m_stats_table_iter = m_statsTable.begin();
+ else {
+ // determine if it is manifest (that we have)
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
+ if (m_fileManifests.end() != manifest_it) {
+ data = std::make_shared<Data>(*manifest_it) ;
}
- onFailed(interest.getName(), "Unknown failure");
- };
-
- m_face.expressInterest(*interest, dataReceived, dataFailed);
+ else {
+ // determine if it is data packet (that we have)
+ auto manifestName = interestName.getSubName(0, interestName.size() - 2);
+ auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
+ [&manifestName](const std::pair<Name,
+ std::pair<std::shared_ptr<fs::fstream>,
+ std::vector<bool>>>& kv){
+ return manifestName.isPrefixOf(kv.first);
+ });
+ if (m_fileStates.end() != map_it) {
+ auto packetName = interestName.getSubName(0, interestName.size() - 1);
+ // get out the bitmap to be sure we have the packet
+ auto& fileState = map_it->second;
+ const auto &bitmap = fileState.second;
+ auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
+ if (bitmap[packetNum]) {
+ // get the manifest
+ auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
+ [&manifestName](const FileManifest& m) {
+ return manifestName.isPrefixOf(m.name());
+ });
+ auto manifestFileName = manifest_it->file_name();
+ auto filePath = m_dataPath + manifestFileName;
+ // TODO(msweatt) Explore why fileState stream does not work
+ fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
+ data = readDataPacket(interestName,
+ *manifest_it,
+ m_subManifestSizes[manifestFileName],
+ is);
+ }
+ }
+ }
+ }
+ if (nullptr != data) {
+ m_face->put(*data);
+ }
+ else {
+ // TODO(msweatt) NACK
+ std::cerr << "NACK: " << interest << std::endl;
+ }
+ return;
}
-shared_ptr<Interest>
-TorrentManager::createInterest(Name name)
+void
+TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
{
- shared_ptr<Interest> interest = make_shared<Interest>(name);
- interest->setInterestLifetime(time::milliseconds(2000));
- interest->setMustBeFresh(true);
-
- // Select routable prefix
- Link link(name, { {1, m_stats_table_iter->getRecordName()} });
- m_keyChain->sign(link, signingWithSha256());
- Block linkWire = link.wireEncode();
-
- // Stats Table update here...
- m_stats_table_iter->incrementSentInterests();
-
- m_sortingCounter++;
- if (m_sortingCounter >= SORTING_INTERVAL) {
- m_sortingCounter = 0;
- m_statsTable.sort();
- m_stats_table_iter = m_statsTable.begin();
- m_retries = 0;
- }
-
- interest->setLink(linkWire);
-
- return interest;
+ std::cerr << "ERROR: Failed to register prefix \""
+ << prefix << "\" in local hub's daemon (" << reason << ")"
+ << std::endl;
+ m_face->shutdown();
}
shared_ptr<Name>
@@ -701,6 +815,24 @@
}
}
+void
+TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
+{
+ std::vector<Name> manifests;
+ // insert the first segment name of all the file manifests to the vector
+ for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
+ manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
+ }
+ // for each file
+ for (const auto& manifestName : manifests) {
+ // find the first (if any) segment we are missing
+ shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
+ if (nullptr != manifestSegmentName) {
+ manifestNames.push_back(*manifestSegmentName);
+ }
+ }
+}
+
bool
TorrentManager::dataAlreadyDownloaded(const Name& dataName)
{
@@ -726,24 +858,6 @@
}
void
-TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
-{
- std::vector<Name> manifests;
- // insert the first segment name of all the file manifests to the vector
- for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
- manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
- }
- // for each file
- for (const auto& manifestName : manifests) {
- // find the first (if any) segment we are missing
- shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
- if (nullptr != manifestSegmentName) {
- manifestNames.push_back(*manifestSegmentName);
- }
- }
-}
-
-void
TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
{
auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
@@ -781,5 +895,33 @@
}
}
+shared_ptr<Interest>
+TorrentManager::createInterest(Name name)
+{
+ shared_ptr<Interest> interest = make_shared<Interest>(name);
+ interest->setInterestLifetime(time::milliseconds(2000));
+ interest->setMustBeFresh(true);
+
+ // Select routable prefix
+ Link link(name, { {1, m_stats_table_iter->getRecordName()} });
+ m_keyChain->sign(link, signingWithSha256());
+ Block linkWire = link.wireEncode();
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementSentInterests();
+
+ m_sortingCounter++;
+ if (m_sortingCounter >= SORTING_INTERVAL) {
+ m_sortingCounter = 0;
+ m_statsTable.sort();
+ m_stats_table_iter = m_statsTable.begin();
+ m_retries = 0;
+ }
+
+ interest->setLink(linkWire);
+
+ return interest;
+}
+
} // end ntorrent
-} // end ndn
+} // end ndn
\ No newline at end of file
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index 28053f2..997f322 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -66,20 +66,9 @@
* The behavior is undefined unless Initialize() is called before calling any other method on a
* TorrentManger object.
*/
- TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath,
- ndn::Face& face);
-
- /*
- * \brief Create a new Torrent manager with the specified parameters.
- * @param torrentFileName The full name of the initial segment of the torrent file
- * @param dataPath The path to the location on disk to use for the torrent data
- *
- * The behavior is undefined unless Initialize() is called before calling any other method on a
- * TorrentManger object.
- */
- TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath);
+ TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ std::shared_ptr<Face> face = nullptr);
/*
* @brief Initialize the state of this object.
@@ -156,7 +145,7 @@
// Seed the specified 'data' to the network.
void
- seed(const Data& data) const;
+ seed(const Data& data);
protected:
/*
@@ -240,11 +229,21 @@
SORTING_INTERVAL = 100
};
+ void onDataReceived(const Data& data);
+
+ void
+ onInterestReceived(const InterestFilter& filter, const Interest& interest);
+
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+
// A map from each fileManifest to corresponding file stream on disk and a bitmap of which Data
// packets this manager currently has
mutable std::unordered_map<Name,
std::pair<std::shared_ptr<fs::fstream>,
std::vector<bool>>> m_fileStates;
+ // A map for each initial manifest to the size for the sub-manifest
+ std::unordered_map<std::string, size_t> m_subManifestSizes;
// The segments of the TorrentFile this manager has
std::vector<TorrentFile> m_torrentSegments;
// The FileManifests this manager has
@@ -322,7 +321,7 @@
// Stats table where routable prefixes are stored
StatsTable m_statsTable;
// Face used for network communication
- Face& m_face;
+ std::shared_ptr<Face> m_face;
// Iterator to the routable prefix that we currently use
StatsTable::iterator m_stats_table_iter;
// Number of retries per routable prefix
@@ -336,7 +335,7 @@
inline
TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
const std::string& dataPath,
- ndn::Face& face)
+ std::shared_ptr<Face> face)
: m_fileStates()
, m_torrentSegments()
, m_fileManifests()
@@ -347,26 +346,9 @@
, m_sortingCounter(0)
, m_keyChain(new KeyChain())
{
- // Hardcoded prefixes for now
- // TODO(Spyros): Think of something more clever to bootstrap...
- m_statsTable.insert("/ucla");
- m_statsTable.insert("/arizona");
- m_stats_table_iter = m_statsTable.begin();
-}
-
-inline
-TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath)
-: m_fileStates()
-, m_torrentSegments()
-, m_fileManifests()
-, m_torrentFileName(torrentFileName)
-, m_dataPath(dataPath)
-, m_face(*(new ndn::Face()))
-, m_retries(0)
-, m_sortingCounter(0)
-, m_keyChain(new KeyChain())
-{
+ if(face == nullptr) {
+ face = make_shared<Face>();
+ }
// Hardcoded prefixes for now
// TODO(Spyros): Think of something more clever to bootstrap...
m_statsTable.insert("/ucla");