Network-related operations of the TorrentManager class
This commit adds the downloadTorrentFile, download_file_manifest, download_data_packet methods
Change-Id: Ie221bc07907eb58117e83e317eca578174142d6d
Refs: #3505
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index afda819..c06db40 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -434,5 +434,195 @@
// TODO(msweatt) IMPLEMENT ME
}
+void
+TorrentManager::downloadTorrentFile(const std::string& path,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ auto manifestNames = make_shared<std::vector<Name>>();
+ this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
+ true, onSuccess, onFailed);
+}
+
+std::vector<Name>
+TorrentManager::downloadTorrentFile(const std::string& path)
+{
+ auto manifestNames = make_shared<std::vector<Name>>();
+ this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
+ false, {}, {});
+ return *manifestNames;
+}
+
+void
+TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> manifestNames,
+ bool async,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Interest> interest = createInterest(name);
+
+ auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+
+ TorrentFile file(data.wireEncode());
+
+ // Write the torrent file segment to disk...
+ writeTorrentSegment(file, path);
+
+ const std::vector<Name>& manifestCatalog = file.getCatalog();
+ manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
+
+ shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
+
+ if (async) {
+ onSuccess(file.getName());
+ }
+ if (nextSegmentPtr != nullptr) {
+ this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
+ async, onSuccess, onFailed);
+ }
+ };
+
+ auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
+ (const Interest& interest) {
+ ++m_retries;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ ++m_stats_table_iter;
+ if (m_stats_table_iter == m_statsTable.end()) {
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ }
+ if (async) {
+ onFailed(interest.getName(), "Unknown error");
+ }
+ this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
+ };
+
+ m_face.expressInterest(*interest, dataReceived, dataFailed);
+
+ if (!async) {
+ m_face.processEvents();
+ }
+}
+
+
+void
+TorrentManager::download_file_manifest(const Name& manifestName,
+ const std::string& path,
+ TorrentManager::ManifestReceivedCallback onSuccess,
+ TorrentManager::FailedCallback onFailed)
+{
+ auto packetNames = make_shared<std::vector<Name>>();
+ this->downloadFileManifestSegment(manifestName, path, packetNames, onSuccess, onFailed);
+}
+
+void
+TorrentManager::downloadFileManifestSegment(const Name& manifestName,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> packetNames,
+ TorrentManager::ManifestReceivedCallback onSuccess,
+ TorrentManager::FailedCallback onFailed)
+{
+ shared_ptr<Interest> interest = this->createInterest(manifestName);
+
+ auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+
+ FileManifest file(data.wireEncode());
+
+ // Write the file manifest segment to disk...
+ writeFileManifest(file, path);
+
+ const std::vector<Name>& packetsCatalog = file.catalog();
+ packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
+ shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
+ if (nextSegmentPtr != nullptr) {
+ this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
+ }
+ else
+ onSuccess(*packetNames);
+ };
+
+ auto dataFailed = [packetNames, path, manifestName, onFailed, this]
+ (const Interest& interest) {
+ m_retries++;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ m_stats_table_iter++;
+ if (m_stats_table_iter == m_statsTable.end())
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ onFailed(interest.getName(), "Unknown failure");
+ };
+
+ m_face.expressInterest(*interest, dataReceived, dataFailed);
+}
+
+void
+TorrentManager::download_data_packet(const Name& packetName,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed)
+{
+ shared_ptr<Interest> interest = this->createInterest(packetName);
+
+ auto dataReceived = [onSuccess, onFailed, this]
+ (const Interest& interest, const Data& data) {
+ // Write data to disk...
+ writeData(data);
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementReceivedData();
+ m_retries = 0;
+ onSuccess(data.getName());
+ };
+ auto dataFailed = [onFailed, this]
+ (const Interest& interest) {
+ m_retries++;
+ if (m_retries >= MAX_NUM_OF_RETRIES) {
+ m_stats_table_iter++;
+ if (m_stats_table_iter == m_statsTable.end())
+ m_stats_table_iter = m_statsTable.begin();
+ }
+ onFailed(interest.getName(), "Unknown failure");
+ };
+
+ m_face.expressInterest(*interest, dataReceived, dataFailed);
+}
+
+shared_ptr<Interest>
+TorrentManager::createInterest(Name name)
+{
+ shared_ptr<Interest> interest = make_shared<Interest>(name);
+ interest->setInterestLifetime(time::milliseconds(2000));
+ interest->setMustBeFresh(true);
+
+ // Select routable prefix
+ Link link(name, { {1, m_stats_table_iter->getRecordName()} });
+ m_keyChain->sign(link, signingWithSha256());
+ Block linkWire = link.wireEncode();
+
+ // Stats Table update here...
+ m_stats_table_iter->incrementSentInterests();
+
+ m_sortingCounter++;
+ if (m_sortingCounter >= SORTING_INTERVAL) {
+ m_sortingCounter = 0;
+ m_statsTable.sort();
+ m_stats_table_iter = m_statsTable.begin();
+ m_retries = 0;
+ }
+
+ interest->setLink(linkWire);
+
+ return interest;
+}
+
} // end ntorrent
-} // end ndn
\ No newline at end of file
+} // end ndn
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index fc790a8..a96f21b 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -21,11 +21,16 @@
#ifndef INCLUDED_TORRENT_FILE_MANAGER_H
#define INCLUDED_TORRENT_FILE_MANAGER_H
-#include "torrent-file.hpp"
#include "file-manifest.hpp"
+#include "stats-table.hpp"
+#include "torrent-file.hpp"
-#include <ndn-cxx/name.hpp>
#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/link.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
#include <boost/filesystem/fstream.hpp>
@@ -47,60 +52,118 @@
* \brief A class to manage the interaction with the system in seeding/leaching a torrent
*/
public:
- typedef std::function<void(const ndn::Name&)> DataReceivedCallback;
- typedef std::function<void(const std::vector<ndn::Name>&)> ManifestReceivedCallback;
- typedef std::function<void(const ndn::Name&, const std::string&)> FailedCallback;
+ typedef std::function<void(const ndn::Name&)> DataReceivedCallback;
+ typedef std::function<void(const std::vector<ndn::Name>&)> ManifestReceivedCallback;
+ typedef std::function<void(const ndn::Name&, const std::string&)> FailedCallback;
- TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath);
+ /*
+ * \brief Create a new Torrent manager with the specified parameters.
+ * @param torrentFileName The full name of the initial segment of the torrent file
+ * @param dataPath The path to the location on disk to use for the torrent data
+ * @param face Optional face object to be used for data retrieval
+ *
+ * The behavior is undefined unless Initialize() is called before calling any other method on a
+ * TorrentManger object.
+ */
+ TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ ndn::Face& face);
+
+ /*
+ * \brief Create a new Torrent manager with the specified parameters.
+ * @param torrentFileName The full name of the initial segment of the torrent file
+ * @param dataPath The path to the location on disk to use for the torrent data
+ *
+ * The behavior is undefined unless Initialize() is called before calling any other method on a
+ * TorrentManger object.
+ */
+ TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath);
+
/*
- * \brief Create a new Torrent manager with the specified parameters.
- * @param torrentFileName The full name of the initial segment of the torrent file
- * @param filePath The path to the location on disk to use for the torrent data
+ * @brief Initialize the state of this object.
*
- * The behavior is undefined unless Initialize() is called before calling any other method on a
- * TorrentManger object.
- */
-
- void Initialize();
- /*
- * \brief Initialize the state of this object.
* Read and validate from disk all torrent file segments, file manifests, and data packets for
* the torrent file managed by this object initializing all state in this manager respectively.
* Also seeds all validated data.
+ */
+ void
+ Initialize();
+
+ /*
+ * @brief Download the torrent file
+ * @param path The path to write the downloaded segments
+ * @return A vector of the file manifest names contained in the torrent file
+ *
*/
+ std::vector<Name>
+ downloadTorrentFile(const std::string& path);
- std::vector<Name> downloadTorrentFile(const std::string& path = "");
- // Request from the network the all segments of the 'torrentFileName' and write onto local disk at
- // the specified 'path'.
+ /*
+ * @brief Download the torrent file
+ * @param path The path to write the downloaded segments
+ * @param onSuccess Callback to be called if we successfully download all the
+ * segments of the torrent file. It passes the name of the file manifest
+ * to be downloaded to the callback
+ * @param onFailed Callaback to be called if we fail to download a segment of
+ * the torrent file. It passes the name of the torrent file segment that
+ * failed to download and a failure reason to the callback
+ *
+ * This method provides non-blocking downloading of all the torrent file segments
+ *
+ */
+ void
+ downloadTorrentFile(const std::string& path,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed);
+ /*
+ * @brief Download a file manifest
+ * @param manifestName The name of the manifest file to be downloaded
+ * @param path The path to write the downloaded segments
+ * @param onSuccess Callback to be called if we successfully download all the
+ * segments of the file manifest. It passes the names of the data packets
+ * to be downloaded to the callback
+ * @param onFailed Callaback to be called if we fail to download a segment of
+ * the file manifest. It passes the name of the data packet that failed
+ * to download and a failure reason
+ *
+ * This method provides non-blocking downloading of all the file manifest segments
+ *
+ */
void download_file_manifest(const Name& manifestName,
const std::string& path,
ManifestReceivedCallback onSuccess,
- FailedCallback onFailed) const;
- // Request from the network the all segments of the 'manifestName' and write onto local disk at
- // the specified 'path'.
+ FailedCallback onFailed);
+ /*
+ * @brief Download a data packet
+ * @param packetName The name of the data packet to be downloaded
+ * @param onSuccess Callback to be called if we successfully download the data packet.
+ * It passes the name of the data packet to the callback
+ * @param onFailed Callaback to be called if we fail to download the requested data packet
+ * It passes the name of the data packet to the callback and a failure reason
+ *
+ * This method writes the downloaded data packet to m_dataPath on disk
+ *
+ */
void download_data_packet(const Name& packetName,
DataReceivedCallback onSuccess,
- FailedCallback onFailed) const;
- // Request from the network the Data with the specified 'packetName' and write onto local disk at
- // 'm_dataPath'.
+ FailedCallback onFailed);
- void seed(const Data& data) const;
// Seed the specified 'data' to the network.
+ void seed(const Data& data) const;
protected:
- bool writeData(const Data& packet);
/*
* \brief Write @p packet composed of torrent date to disk.
* @param packet The data packet to be written to the disk
* Write the Data packet to disk, return 'true' if data successfully written to disk 'false'
* otherwise. Behavior is undefined unless the corresponding file manifest has already been
* downloaded.
- */
+ */
+ bool writeData(const Data& packet);
- bool writeTorrentSegment(const TorrentFile& segment, const std::string& path);
/*
* \brief Write the @p segment torrent segment to disk at the specified path.
* @param segment The torrent file segment to be written to disk
@@ -109,8 +172,8 @@
* otherwise. Behavior is undefined unless @segment is a correct segment for the torrent file of
* this manager and @p path is the directory used for all segments of this torrent file.
*/
+ bool writeTorrentSegment(const TorrentFile& segment, const std::string& path);
- bool writeFileManifest(const FileManifest& manifest, const std::string& path);
/*
* \brief Write the @p manifest file manifest to disk at the specified @p path.
* @param manifest The file manifest to be written to disk
@@ -120,10 +183,53 @@
* torrent file of this manager and @p path is the directory used for all file manifests of this
* torrent file.
*/
+ bool writeFileManifest(const FileManifest& manifest, const std::string& path);
- void onDataReceived(const Data& data);
+ /*
+ * \brief Download the segments of the torrent file
+ * @param name The name of the torrent file to be downloaded
+ * @param path The path to write the torrent file on disk
+ * @param manifestNames A vector containing the name of the manifests in the torrent file.
+ * This parameter will be updated every time we receive a torrent
+ * file segment
+ * @param async Blocking (sync) or non-blocking (async) operation
+ * @param onSuccess Optional callback to be called when all the segments of the torrent file
+ * have been downloaded. The default value is an empty callack. A callback
+ * should be specified when async is false
+ * @param onFailed Optional callback to be called when we fail to download a segment of the
+ * torrent file. The default value is an empty callack. A callback should be
+ * specified when async is false
+ *
+ */
+ void downloadTorrentFileSegment(const ndn::Name& name,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> manifestNames,
+ bool async,
+ DataReceivedCallback onSuccess,
+ FailedCallback onFailed);
- void onInterestReceived(const Name& name);
+ /*
+ * \brief Download the segments of a file manifest
+ * @param manifestName The name of the file manifest to be downloaded
+ * @param path The path to write the file manifest on disk
+ * @param packetNames A vector containing the name of the data packets in the file manifest
+ * @param onSuccess Callback to be called when all the segments of the file manifest have been
+ * downloaded
+ * @param onFailed Callback to be called when we fail to download a file manifest segment
+ *
+ */
+ void downloadFileManifestSegment(const Name& manifestName,
+ const std::string& path,
+ std::shared_ptr<std::vector<Name>> packetNames,
+ TorrentManager::ManifestReceivedCallback onSuccess,
+ TorrentManager::FailedCallback onFailed);
+
+ enum {
+ // Number of times to retry if a routable prefix fails to retrieve data
+ MAX_NUM_OF_RETRIES = 5,
+ // Number of Interests to be sent before sorting the stats table
+ SORTING_INTERVAL = 100
+ };
// A map from each fileManifest to corresponding file stream on disk and a bitmap of which Data
// packets this manager currently has
@@ -138,20 +244,67 @@
Name m_torrentFileName;
// The path to the location on disk of the Data packet for this manager
std::string m_dataPath;
+
+private:
+ shared_ptr<Interest>
+ createInterest(Name name);
+
+ // Stats table where routable prefixes are stored
+ StatsTable m_statsTable;
+ // Face used for network communication
+ Face& m_face;
+ // Iterator to the routable prefix that we currently use
+ StatsTable::iterator m_stats_table_iter;
+ // Number of retries per routable prefix
+ uint64_t m_retries;
+ // Number of Interests sent since last sorting
+ uint64_t m_sortingCounter;
+ // Keychain instance
+ unique_ptr<KeyChain> m_keyChain;
};
inline
-TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
- const std::string& dataPath)
+TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath,
+ ndn::Face& face)
: m_fileStates()
, m_torrentSegments()
, m_fileManifests()
, m_torrentFileName(torrentFileName)
, m_dataPath(dataPath)
+, m_face(face)
+, m_retries(0)
+, m_sortingCounter(0)
+, m_keyChain(new KeyChain())
{
+ // Hardcoded prefixes for now
+ // TODO(Spyros): Think of something more clever to bootstrap...
+ m_statsTable.insert("/ucla");
+ m_statsTable.insert("/arizona");
+ m_stats_table_iter = m_statsTable.begin();
+}
+
+inline
+TorrentManager::TorrentManager(const ndn::Name& torrentFileName,
+ const std::string& dataPath)
+: m_fileStates()
+, m_torrentSegments()
+, m_fileManifests()
+, m_torrentFileName(torrentFileName)
+, m_dataPath(dataPath)
+, m_face(*(new ndn::Face()))
+, m_retries(0)
+, m_sortingCounter(0)
+, m_keyChain(new KeyChain())
+{
+ // Hardcoded prefixes for now
+ // TODO(Spyros): Think of something more clever to bootstrap...
+ m_statsTable.insert("/ucla");
+ m_statsTable.insert("/arizona");
+ m_stats_table_iter = m_statsTable.begin();
}
} // end ntorrent
} // end ndn
-#endif // INCLUDED_TORRENT_FILE_MANAGER_H
\ No newline at end of file
+#endif // INCLUDED_TORRENT_FILE_MANAGER_H