blob: c06db40bdd61b757213bb14e2a6b2c1ce6c2f97e [file] [log] [blame]
#include "torrent-manager.hpp"
#include "file-manifest.hpp"
#include "torrent-file.hpp"
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <ndn-cxx/data.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/security/signing-helpers.hpp>
#include <ndn-cxx/util/io.hpp>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
namespace fs = boost::filesystem;
using std::string;
using std::vector;
namespace {
// TODO(msweatt) Move this to a utility
template<typename T>
static vector<T>
load_directory(const string& dirPath,
ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
vector<T> structures;
std::set<string> fileNames;
if (fs::exists(dirPath)) {
for(fs::recursive_directory_iterator it(dirPath);
it != fs::recursive_directory_iterator();
++it)
{
fileNames.insert(it->path().string());
}
for (const auto& f : fileNames) {
auto data_ptr = ndn::io::load<T>(f, encoding);
if (nullptr != data_ptr) {
structures.push_back(*data_ptr);
}
}
}
structures.shrink_to_fit();
return structures;
}
} // end anonymous
namespace ndn {
namespace ntorrent {
// TODO(msweatt) Move this to a utility
static vector<ndn::Data>
packetize_file(const fs::path& filePath,
const ndn::Name& commonPrefix,
size_t dataPacketSize,
size_t subManifestSize,
size_t subManifestNum)
{
BOOST_ASSERT(0 < dataPacketSize);
size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
auto file_size = fs::file_size(filePath);
auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
// determine the number of bytes in this submanifest
auto subManifestLength = subManifestSize * dataPacketSize;
auto remainingFileLength = file_size - start_offset;
subManifestLength = remainingFileLength < subManifestLength
? remainingFileLength
: subManifestLength;
vector<ndn::Data> packets;
packets.reserve(subManifestLength/dataPacketSize + 1);
fs::ifstream fs(filePath, fs::ifstream::binary);
if (!fs) {
BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
}
// ensure that buffer is large enough to contain whole packets
// buffer size is either the entire file or the smallest number of data packets >= 2 GB
auto buffer_size =
subManifestLength < APPROX_BUFFER_SIZE ?
subManifestLength :
APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
APPROX_BUFFER_SIZE :
APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
vector<char> file_bytes;
file_bytes.reserve(buffer_size);
size_t bytes_read = 0;
fs.seekg(start_offset);
while(fs && bytes_read < subManifestLength && !fs.eof()) {
// read the file into the buffer
fs.read(&file_bytes.front(), buffer_size);
auto read_size = fs.gcount();
if (fs.bad() || read_size < 0) {
BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
}
bytes_read += read_size;
char *curr_start = &file_bytes.front();
for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
// Build a packet from the data
Name packetName = commonPrefix;
packetName.appendSequenceNumber(packets.size());
Data d(packetName);
auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
curr_start += content_length;
// append to the collection
packets.push_back(d);
}
file_bytes.clear();
// recompute the buffer_size
buffer_size =
subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
subManifestLength - bytes_read :
APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
APPROX_BUFFER_SIZE :
APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
}
fs.close();
packets.shrink_to_fit();
ndn::security::KeyChain key_chain;
// sign all the packets
for (auto& p : packets) {
key_chain.sign(p, signingWithSha256());
}
return packets;
}
static vector<TorrentFile>
intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
{
security::KeyChain key_chain;
Name currSegmentFullName = initialSegmentName;
vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
// Starting with the initial segment name, verify the names, loading next name from torrentSegment
for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
TorrentFile& segment = *it;
key_chain.sign(segment, signingWithSha256());
if (segment.getFullName() != currSegmentFullName) {
vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
torrentSegments.swap(correctSegments);
break;
}
// load the next full name
if (nullptr == segment.getTorrentFilePtr()) {
break;
}
currSegmentFullName = *segment.getTorrentFilePtr();
}
return torrentSegments;
}
static vector<FileManifest>
intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
{
security::KeyChain key_chain;
vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
// sign the manifests
std::for_each(manifests.begin(), manifests.end(),
[&key_chain](FileManifest& m){
key_chain.sign(m,signingWithSha256());
});
// put all names of manifests from the valid torrent files into a set
std::set<ndn::Name> validManifestNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
validManifestNames.insert(catalog.begin(), catalog.end());
}
// put all names of file manifests from disk into a set
std::set<ndn::Name> loadedManifestNames;
std::for_each(manifests.begin(), manifests.end(),
[&loadedManifestNames](const FileManifest& m){
loadedManifestNames.insert(m.getFullName());
});
// the set of fileManifests that we have is simply the intersection
std::set<Name> output;
std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
loadedManifestNames.begin(), loadedManifestNames.end(),
std::inserter(output, output.begin()));
// filter out those manifests that are not in this set
std::remove_if(manifests.begin(),
manifests.end(),
[&output](const FileManifest& m) {
return (output.end() == output.find(m.name()));
});
// order the manifests in the same order they are in the torrent
std::vector<Name> catalogNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
}
size_t curr_index = 0;
for (auto name : catalogNames) {
auto it = std::find_if(manifests.begin(), manifests.end(),
[&name](const FileManifest& m) {
return m.getFullName() == name;
});
if (it != manifests.end()) {
// not already in the correct position
if (it != manifests.begin() + curr_index) {
std::swap(manifests[curr_index], *it);
}
++curr_index;
}
}
return manifests;
}
static vector<Data>
intializeDataPackets(const string& filePath,
const FileManifest manifest,
const TorrentFile& torrentFile)
{
vector<Data> packets;
auto subManifestNum = manifest.submanifest_number();
packets = packetize_file(filePath,
manifest.name(),
manifest.data_packet_size(),
manifest.catalog().size(),
subManifestNum);
auto catalog = manifest.catalog();
// Filter out invalid packet names
std::remove_if(packets.begin(), packets.end(),
[&packets, &catalog](const Data& p) {
return catalog.end() == std::find(catalog.begin(),
catalog.end(),
p.getFullName());
});
return packets;
}
static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
initializeFileState(const string& dataPath,
const FileManifest& manifest)
{
// construct the file name
auto fileName = manifest.file_name();
auto filePath = dataPath + fileName;
vector<bool> fileBitMap(manifest.catalog().size());
auto fbits = fs::fstream::out | fs::fstream::binary;
// if file exists, use in O/W use concatenate mode
fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
auto s = std::make_shared<fs::fstream>(filePath, fbits);
if (!*s) {
BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
}
return std::make_pair(s, fileBitMap);
}
void TorrentManager::Initialize()
{
// .../<torrent_name>/torrent-file/<implicit_digest>
string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
string manifestPath = dataPath +"/manifests";
string torrentFilePath = dataPath +"/torrent_files";
// get the torrent file segments and manifests that we have.
if (!fs::exists(torrentFilePath)) {
return;
}
m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
if (m_torrentSegments.empty()) {
return;
}
m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
auto currTorrentFile_it = m_torrentSegments.begin();
for (const auto& m : m_fileManifests) {
// find the appropriate torrent file
auto currCatalog = currTorrentFile_it->getCatalog();
while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
{
++currTorrentFile_it;
currCatalog = currTorrentFile_it->getCatalog();
}
// construct the file name
auto fileName = m.file_name();
fs::path filePath = m_dataPath + fileName;
// If there are any valid packets, add corresponding state to manager
if (!fs::exists(filePath)) {
boost::filesystem::create_directories(filePath.parent_path());
continue;
}
auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
if (!packets.empty()) {
m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
auto& fileBitMap = m_fileStates[m.getFullName()].second;
auto read_it = packets.begin();
size_t i = 0;
for (auto name : m.catalog()) {
if (name == read_it->getFullName()) {
++read_it;
fileBitMap[i] = true;
}
++i;
}
for (const auto& d : packets) {
seed(d);
}
}
}
for (const auto& t : m_torrentSegments) {
seed(t);
}
for (const auto& m : m_fileManifests) {
seed(m);
}
}
bool TorrentManager::writeData(const Data& packet)
{
// find correct manifest
const auto& packetName = packet.getName();
auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
[&packetName](const FileManifest& m) {
return m.getName().isPrefixOf(packetName);
});
if (m_fileManifests.end() == manifest_it) {
return false;
}
// get file state out
auto& fileState = m_fileStates[manifest_it->getFullName()];
// if there is no open stream to the file
if (nullptr == fileState.first) {
fileState = initializeFileState(m_dataPath, *manifest_it);
}
auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
// if we already have the packet, do not rewrite it.
if (fileState.second[packetNum]) {
return false;
}
auto packetOffset = packetNum * manifest_it->data_packet_size();
// write data to disk
fileState.first->seekg(packetOffset);
try {
auto content = packet.getContent();
std::vector<char> data(content.value_begin(), content.value_end());
fileState.first->write(&data[0], data.size());
}
catch (io::Error &e) {
std::cerr << e.what() << std::endl;
return false;
}
// update bitmap
fileState.second[packetNum] = true;
return true;
}
bool TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
{
// validate that this torrent segment belongs to our torrent
auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
if (!torrentPrefix.isPrefixOf(segment.getName())) {
return false;
}
auto segmentNum = segment.getSegmentNumber();
// check if we already have it
if (m_torrentSegments.end() != std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
segment))
{
return false;
}
// write to disk at path
if (!fs::exists(path)) {
fs::create_directories(path);
}
auto filename = path + to_string(segmentNum);
// if there is already a file on disk for this torrent segment, determine if we should override
if (fs::exists(filename)) {
auto segmentOnDisk_ptr = io::load<TorrentFile>(filename);
if (nullptr != segmentOnDisk_ptr && *segmentOnDisk_ptr == segment) {
return false;
}
}
io::save(segment, filename);
// add to collection
auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
[segmentNum](const TorrentFile& t){
return t.getSegmentNumber() > segmentNum;
});
m_torrentSegments.insert(it, segment);
return true;
}
bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
{
auto subManifestNum = manifest.submanifest_number();
fs::path filename = path + manifest.file_name() + "/" + to_string(subManifestNum);
// check if we already have it
if (m_fileManifests.end() != std::find(m_fileManifests.begin(), m_fileManifests.end(),
manifest))
{
return false;
}
// write to disk at path
if (!fs::exists(filename.parent_path())) {
boost::filesystem::create_directories(filename.parent_path());
}
// if there is already a file on disk for this torrent segment, determine if we should override
if (fs::exists(filename)) {
auto submanifestOnDisk_ptr = io::load<FileManifest>(filename.string());
if (nullptr != submanifestOnDisk_ptr && *submanifestOnDisk_ptr == manifest) {
return false;
}
}
io::save(manifest, filename.string());
// add to collection
// add to collection
auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
[&manifest](const FileManifest& m){
return m.file_name() > manifest.file_name()
|| (m.file_name() == manifest.file_name()
&& (m.submanifest_number() > manifest.submanifest_number()));
});
m_fileManifests.insert(it, manifest);
return true;
}
void TorrentManager::seed(const Data& data) const {
// TODO(msweatt) IMPLEMENT ME
}
void
TorrentManager::downloadTorrentFile(const std::string& path,
DataReceivedCallback onSuccess,
FailedCallback onFailed)
{
auto manifestNames = make_shared<std::vector<Name>>();
this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
true, onSuccess, onFailed);
}
std::vector<Name>
TorrentManager::downloadTorrentFile(const std::string& path)
{
auto manifestNames = make_shared<std::vector<Name>>();
this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
false, {}, {});
return *manifestNames;
}
void
TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
const std::string& path,
std::shared_ptr<std::vector<Name>> manifestNames,
bool async,
DataReceivedCallback onSuccess,
FailedCallback onFailed)
{
shared_ptr<Interest> interest = createInterest(name);
auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
TorrentFile file(data.wireEncode());
// Write the torrent file segment to disk...
writeTorrentSegment(file, path);
const std::vector<Name>& manifestCatalog = file.getCatalog();
manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
if (async) {
onSuccess(file.getName());
}
if (nextSegmentPtr != nullptr) {
this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
async, onSuccess, onFailed);
}
};
auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
(const Interest& interest) {
++m_retries;
if (m_retries >= MAX_NUM_OF_RETRIES) {
++m_stats_table_iter;
if (m_stats_table_iter == m_statsTable.end()) {
m_stats_table_iter = m_statsTable.begin();
}
}
if (async) {
onFailed(interest.getName(), "Unknown error");
}
this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
};
m_face.expressInterest(*interest, dataReceived, dataFailed);
if (!async) {
m_face.processEvents();
}
}
void
TorrentManager::download_file_manifest(const Name& manifestName,
const std::string& path,
TorrentManager::ManifestReceivedCallback onSuccess,
TorrentManager::FailedCallback onFailed)
{
auto packetNames = make_shared<std::vector<Name>>();
this->downloadFileManifestSegment(manifestName, path, packetNames, onSuccess, onFailed);
}
void
TorrentManager::downloadFileManifestSegment(const Name& manifestName,
const std::string& path,
std::shared_ptr<std::vector<Name>> packetNames,
TorrentManager::ManifestReceivedCallback onSuccess,
TorrentManager::FailedCallback onFailed)
{
shared_ptr<Interest> interest = this->createInterest(manifestName);
auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
FileManifest file(data.wireEncode());
// Write the file manifest segment to disk...
writeFileManifest(file, path);
const std::vector<Name>& packetsCatalog = file.catalog();
packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
if (nextSegmentPtr != nullptr) {
this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
}
else
onSuccess(*packetNames);
};
auto dataFailed = [packetNames, path, manifestName, onFailed, this]
(const Interest& interest) {
m_retries++;
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
if (m_stats_table_iter == m_statsTable.end())
m_stats_table_iter = m_statsTable.begin();
}
onFailed(interest.getName(), "Unknown failure");
};
m_face.expressInterest(*interest, dataReceived, dataFailed);
}
void
TorrentManager::download_data_packet(const Name& packetName,
DataReceivedCallback onSuccess,
FailedCallback onFailed)
{
shared_ptr<Interest> interest = this->createInterest(packetName);
auto dataReceived = [onSuccess, onFailed, this]
(const Interest& interest, const Data& data) {
// Write data to disk...
writeData(data);
// Stats Table update here...
m_stats_table_iter->incrementReceivedData();
m_retries = 0;
onSuccess(data.getName());
};
auto dataFailed = [onFailed, this]
(const Interest& interest) {
m_retries++;
if (m_retries >= MAX_NUM_OF_RETRIES) {
m_stats_table_iter++;
if (m_stats_table_iter == m_statsTable.end())
m_stats_table_iter = m_statsTable.begin();
}
onFailed(interest.getName(), "Unknown failure");
};
m_face.expressInterest(*interest, dataReceived, dataFailed);
}
shared_ptr<Interest>
TorrentManager::createInterest(Name name)
{
shared_ptr<Interest> interest = make_shared<Interest>(name);
interest->setInterestLifetime(time::milliseconds(2000));
interest->setMustBeFresh(true);
// Select routable prefix
Link link(name, { {1, m_stats_table_iter->getRecordName()} });
m_keyChain->sign(link, signingWithSha256());
Block linkWire = link.wireEncode();
// Stats Table update here...
m_stats_table_iter->incrementSentInterests();
m_sortingCounter++;
if (m_sortingCounter >= SORTING_INTERVAL) {
m_sortingCounter = 0;
m_statsTable.sort();
m_stats_table_iter = m_statsTable.begin();
m_retries = 0;
}
interest->setLink(linkWire);
return interest;
}
} // end ntorrent
} // end ndn