blob: f62491fa7b73cc6f67d928ff9ab56fbfdb65028c [file] [log] [blame]
#include "torrent-manager.hpp"
#include "file-manifest.hpp"
#include "torrent-file.hpp"
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <ndn-cxx/data.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/security/signing-helpers.hpp>
#include <ndn-cxx/util/io.hpp>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
namespace fs = boost::filesystem;
using std::string;
using std::vector;
namespace {
// TODO(msweatt) Move this to a utility
template<typename T>
static vector<T>
load_directory(const string& dirPath,
ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
vector<T> structures;
if (fs::exists(dirPath)) {
for(fs::directory_iterator it(dirPath);
it != fs::directory_iterator();
++it)
{
auto data_ptr = ndn::io::load<T>(it->path().string(), encoding);
if (nullptr != data_ptr) {
structures.push_back(*data_ptr);
}
}
}
structures.shrink_to_fit();
return structures;
}
} // end anonymous
namespace ndn {
namespace ntorrent {
// TODO(msweatt) Move this to a utility
static vector<ndn::Data>
packetize_file(const fs::path& filePath,
const ndn::Name& commonPrefix,
size_t dataPacketSize,
size_t subManifestSize,
size_t subManifestNum)
{
BOOST_ASSERT(0 < dataPacketSize);
size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
auto file_size = fs::file_size(filePath);
auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
// determine the number of bytes in this submanifest
auto subManifestLength = subManifestSize * dataPacketSize;
auto remainingFileLength = file_size - start_offset;
subManifestLength = remainingFileLength < subManifestLength
? remainingFileLength
: subManifestLength;
vector<ndn::Data> packets;
packets.reserve(subManifestLength/dataPacketSize + 1);
fs::ifstream fs(filePath, fs::ifstream::binary);
if (!fs) {
BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
}
// ensure that buffer is large enough to contain whole packets
// buffer size is either the entire file or the smallest number of data packets >= 2 GB
auto buffer_size =
subManifestLength < APPROX_BUFFER_SIZE ?
subManifestLength :
APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
APPROX_BUFFER_SIZE :
APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
vector<char> file_bytes;
file_bytes.reserve(buffer_size);
size_t bytes_read = 0;
fs.seekg(start_offset);
while(fs && bytes_read < subManifestLength && !fs.eof()) {
// read the file into the buffer
fs.read(&file_bytes.front(), buffer_size);
auto read_size = fs.gcount();
if (fs.bad() || read_size < 0) {
BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
}
bytes_read += read_size;
char *curr_start = &file_bytes.front();
for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
// Build a packet from the data
Name packetName = commonPrefix;
packetName.appendSequenceNumber(packets.size());
Data d(packetName);
auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
curr_start += content_length;
// append to the collection
packets.push_back(d);
}
file_bytes.clear();
// recompute the buffer_size
buffer_size =
subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
subManifestLength - bytes_read :
APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
APPROX_BUFFER_SIZE :
APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
}
fs.close();
packets.shrink_to_fit();
ndn::security::KeyChain key_chain;
// sign all the packets
for (auto& p : packets) {
key_chain.sign(p, signingWithSha256());
}
return packets;
}
static vector<TorrentFile>
intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
{
security::KeyChain key_chain;
Name currSegmentFullName = initialSegmentName;
vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
// Starting with the initial segment name, verify the names, loading next name from torrentSegment
for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
TorrentFile& segment = *it;
key_chain.sign(segment, signingWithSha256());
if (segment.getFullName() != currSegmentFullName) {
vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
torrentSegments.swap(correctSegments);
break;
}
// load the next full name
if (nullptr == segment.getTorrentFilePtr()) {
break;
}
currSegmentFullName = *segment.getTorrentFilePtr();
}
return torrentSegments;
}
static vector<FileManifest>
intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
{
security::KeyChain key_chain;
vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
// sign the manifests
std::for_each(manifests.begin(), manifests.end(),
[&key_chain](FileManifest& m){
key_chain.sign(m,signingWithSha256());
});
// put all names of manifests from the valid torrent files into a set
std::set<ndn::Name> validManifestNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
validManifestNames.insert(catalog.begin(), catalog.end());
}
// put all names of file manifests from disk into a set
std::set<ndn::Name> loadedManifestNames;
std::for_each(manifests.begin(), manifests.end(),
[&loadedManifestNames](const FileManifest& m){
loadedManifestNames.insert(m.getFullName());
});
// the set of fileManifests that we have is simply the intersection
std::set<Name> output;
std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
loadedManifestNames.begin(), loadedManifestNames.end(),
std::inserter(output, output.begin()));
// filter out those manifests that are not in this set
std::remove_if(manifests.begin(),
manifests.end(),
[&output](const FileManifest& m) {
return (output.end() == output.find(m.name()));
});
// order the manifests in the same order they are in the torrent
std::vector<Name> catalogNames;
for (const auto& segment : torrentSegments) {
const auto& catalog = segment.getCatalog();
catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
}
size_t curr_index = 0;
for (auto name : catalogNames) {
auto it = std::find_if(manifests.begin(), manifests.end(),
[&name](const FileManifest& m) {
return m.getFullName() == name;
});
if (it != manifests.end()) {
// not already in the correct position
if (it != manifests.begin() + curr_index) {
std::swap(manifests[curr_index], *it);
}
++curr_index;
}
}
return manifests;
}
static vector<Data>
intializeDataPackets(const string& filePath,
const FileManifest manifest,
const TorrentFile& torrentFile)
{
vector<Data> packets;
auto subManifestNum = manifest.name().get(manifest.name().size() - 1).toSequenceNumber();
packets = packetize_file(filePath,
manifest.name(),
manifest.data_packet_size(),
manifest.catalog().size(),
subManifestNum);
auto catalog = manifest.catalog();
// Filter out invalid packet names
std::remove_if(packets.begin(), packets.end(),
[&packets, &catalog](const Data& p) {
return catalog.end() == std::find(catalog.begin(),
catalog.end(),
p.getFullName());
});
return packets;
}
static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
initializeFileState(const string& dataPath,
const FileManifest& manifest)
{
// construct the file name
const auto manifestName = manifest.name();
auto fileName = manifestName.getSubName(1, manifestName.size() - 2).toUri();
auto filePath = dataPath + fileName;
vector<bool> fileBitMap(manifest.catalog().size());
auto fbits = fs::fstream::out | fs::fstream::binary;
// if file exists, use in O/W use concatenate mode
fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
auto s = std::make_shared<fs::fstream>(filePath, fbits);
if (!*s) {
BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
}
return std::make_pair(s, fileBitMap);
}
void TorrentManager::Initialize()
{
// .../<torrent_name>/torrent-file/<implicit_digest>
string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
string manifestPath = dataPath +"/manifests";
string torrentFilePath = dataPath +"/torrent_files";
// get the torrent file segments and manifests that we have.
if (!fs::exists(torrentFilePath)) {
return;
}
m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
if (m_torrentSegments.empty()) {
return;
}
m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
auto currTorrentFile_it = m_torrentSegments.begin();
for (const auto& m : m_fileManifests) {
// find the appropriate torrent file
auto currCatalog = currTorrentFile_it->getCatalog();
while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
{
++currTorrentFile_it;
currCatalog = currTorrentFile_it->getCatalog();
}
// construct the file name
auto fileName = m.name().getSubName(1, m.name().size() - 2).toUri();
fs::path filePath = m_dataPath + fileName;
// If there are any valid packets, add corresponding state to manager
if (!fs::exists(filePath)) {
boost::filesystem::create_directories(filePath.parent_path());
continue;
}
auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
if (!packets.empty()) {
m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
auto& fileBitMap = m_fileStates[m.getFullName()].second;
auto read_it = packets.begin();
size_t i = 0;
for (auto name : m.catalog()) {
if (name == read_it->getFullName()) {
++read_it;
fileBitMap[i] = true;
}
++i;
}
for (const auto& d : packets) {
seed(d);
}
}
}
for (const auto& t : m_torrentSegments) {
seed(t);
}
for (const auto& m : m_fileManifests) {
seed(m);
}
}
bool TorrentManager::writeData(const Data& packet)
{
// find correct manifest
const auto& packetName = packet.getName();
auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
[&packetName](const FileManifest& m) {
return m.getName().isPrefixOf(packetName);
});
if (m_fileManifests.end() == manifest_it) {
return false;
}
// get file state out
auto& fileState = m_fileStates[manifest_it->getFullName()];
// if there is no open stream to the file
if (nullptr == fileState.first) {
fileState = initializeFileState(m_dataPath, *manifest_it);
}
auto packetNum = packetName.get(packet.getName().size() - 1).toSequenceNumber();
// if we already have the packet, do not rewrite it.
if (fileState.second[packetNum]) {
return false;
}
auto packetOffset = packetNum * manifest_it->data_packet_size();
// write data to disk
fileState.first->seekg(packetOffset);
try {
auto content = packet.getContent();
std::vector<char> data(content.value_begin(), content.value_end());
fileState.first->write(&data[0], data.size());
}
catch (io::Error &e) {
std::cerr << e.what() << std::endl;
return false;
}
// update bitmap
fileState.second[packetNum] = true;
return true;
}
void TorrentManager::seed(const Data& data) const {
// TODO(msweatt) IMPLEMENT ME
}
} // end ntorrent
} // end ndn