blob: 155d7cf1a323f796a96b31a94805b9912d08677b [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
5
6#include <boost/filesystem.hpp>
7#include <boost/filesystem/fstream.hpp>
8
9#include <ndn-cxx/data.hpp>
10#include <ndn-cxx/security/key-chain.hpp>
11#include <ndn-cxx/security/signing-helpers.hpp>
12#include <ndn-cxx/util/io.hpp>
13
14#include <set>
15#include <string>
16#include <unordered_map>
17#include <vector>
18
19namespace fs = boost::filesystem;
20
21using std::string;
22using std::vector;
23
24namespace {
25// TODO(msweatt) Move this to a utility
26template<typename T>
27static vector<T>
28load_directory(const string& dirPath,
29 ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
30 vector<T> structures;
Mickey Sweatt599bfef2016-04-05 19:11:20 -070031 std::set<string> fileNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080032 if (fs::exists(dirPath)) {
Mickey Sweatt599bfef2016-04-05 19:11:20 -070033 for(fs::recursive_directory_iterator it(dirPath);
34 it != fs::recursive_directory_iterator();
Mickey Sweatt527b0492016-03-02 11:07:48 -080035 ++it)
36 {
Mickey Sweatt599bfef2016-04-05 19:11:20 -070037 fileNames.insert(it->path().string());
38 }
39 for (const auto& f : fileNames) {
40 auto data_ptr = ndn::io::load<T>(f, encoding);
Mickey Sweatt527b0492016-03-02 11:07:48 -080041 if (nullptr != data_ptr) {
42 structures.push_back(*data_ptr);
43 }
44 }
45 }
46 structures.shrink_to_fit();
47 return structures;
48}
49
50} // end anonymous
51
52namespace ndn {
53namespace ntorrent {
54
55// TODO(msweatt) Move this to a utility
56static vector<ndn::Data>
57packetize_file(const fs::path& filePath,
58 const ndn::Name& commonPrefix,
59 size_t dataPacketSize,
60 size_t subManifestSize,
61 size_t subManifestNum)
62{
63 BOOST_ASSERT(0 < dataPacketSize);
64 size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
65 auto file_size = fs::file_size(filePath);
66 auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
67 // determine the number of bytes in this submanifest
68 auto subManifestLength = subManifestSize * dataPacketSize;
69 auto remainingFileLength = file_size - start_offset;
70 subManifestLength = remainingFileLength < subManifestLength
71 ? remainingFileLength
72 : subManifestLength;
73 vector<ndn::Data> packets;
74 packets.reserve(subManifestLength/dataPacketSize + 1);
75 fs::ifstream fs(filePath, fs::ifstream::binary);
76 if (!fs) {
77 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
78 }
79 // ensure that buffer is large enough to contain whole packets
80 // buffer size is either the entire file or the smallest number of data packets >= 2 GB
81 auto buffer_size =
82 subManifestLength < APPROX_BUFFER_SIZE ?
83 subManifestLength :
84 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
85 APPROX_BUFFER_SIZE :
86 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
87 vector<char> file_bytes;
88 file_bytes.reserve(buffer_size);
89 size_t bytes_read = 0;
90 fs.seekg(start_offset);
91 while(fs && bytes_read < subManifestLength && !fs.eof()) {
92 // read the file into the buffer
93 fs.read(&file_bytes.front(), buffer_size);
94 auto read_size = fs.gcount();
95 if (fs.bad() || read_size < 0) {
96 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
97 }
98 bytes_read += read_size;
99 char *curr_start = &file_bytes.front();
100 for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
101 // Build a packet from the data
102 Name packetName = commonPrefix;
103 packetName.appendSequenceNumber(packets.size());
104 Data d(packetName);
105 auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
106 d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
107 curr_start += content_length;
108 // append to the collection
109 packets.push_back(d);
110 }
111 file_bytes.clear();
112 // recompute the buffer_size
113 buffer_size =
114 subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
115 subManifestLength - bytes_read :
116 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
117 APPROX_BUFFER_SIZE :
118 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
119 }
120 fs.close();
121 packets.shrink_to_fit();
122 ndn::security::KeyChain key_chain;
123 // sign all the packets
124 for (auto& p : packets) {
125 key_chain.sign(p, signingWithSha256());
126 }
127 return packets;
128}
129
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700130// =================================================================================================
131// Torrent Manager Utility Functions
132// ==================================================================================================
133
134static std::shared_ptr<Data>
135readDataPacket(const Name& packetFullName,
136 const FileManifest& manifest,
137 size_t subManifestSize,
138 fs::fstream& is) {
139 auto dataPacketSize = manifest.data_packet_size();
140 auto start_offset = manifest.submanifest_number() * subManifestSize * dataPacketSize;
141 auto packetNum = packetFullName.get(packetFullName.size() - 2).toSequenceNumber();
142 // seek to packet
143 is.sync();
144 is.seekg(start_offset + packetNum * dataPacketSize);
145 if (is.tellg() < 0) {
146 std::cerr << "bad seek" << std::endl;
147 }
148 // read contents
149 std::vector<char> bytes(dataPacketSize);
150 is.read(&bytes.front(), dataPacketSize);
151 auto read_size = is.gcount();
152 if (is.bad() || read_size < 0) {
153 std::cerr << "Bad read" << std::endl;
154 return nullptr;
155 }
156 // construct packet
157 auto packetName = packetFullName.getSubName(0, packetFullName.size() - 1);
158 auto d = make_shared<Data>(packetName);
159 d->setContent(encoding::makeBinaryBlock(tlv::Content, &bytes.front(), read_size));
160 ndn::security::KeyChain key_chain;
161 key_chain.sign(*d, signingWithSha256());
162 return d->getFullName() == packetFullName ? d : nullptr;
163}
164
Mickey Sweatt527b0492016-03-02 11:07:48 -0800165static vector<TorrentFile>
166intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
167{
168 security::KeyChain key_chain;
169 Name currSegmentFullName = initialSegmentName;
170 vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
171 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
172 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
173 TorrentFile& segment = *it;
174 key_chain.sign(segment, signingWithSha256());
175 if (segment.getFullName() != currSegmentFullName) {
176 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
177 torrentSegments.swap(correctSegments);
178 break;
179 }
180 // load the next full name
181 if (nullptr == segment.getTorrentFilePtr()) {
182 break;
183 }
184 currSegmentFullName = *segment.getTorrentFilePtr();
185 }
186 return torrentSegments;
187}
188
189static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700190intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800191{
192 security::KeyChain key_chain;
193
194 vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700195 if (manifests.empty()) {
196 return manifests;
197 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800198 // sign the manifests
199 std::for_each(manifests.begin(), manifests.end(),
200 [&key_chain](FileManifest& m){
201 key_chain.sign(m,signingWithSha256());
202 });
203
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700204 // put all names of initial manifests from the valid torrent files into a set
205 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800206 for (const auto& segment : torrentSegments) {
207 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700208 validInitialManifestNames.insert(validInitialManifestNames.end(),
209 catalog.begin(),
210 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -0800211 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700212 auto manifest_it = manifests.begin();
213 std::vector<FileManifest> output;
214 output.reserve(manifests.size());
215 auto validIvalidInitialManifestNames_it = validInitialManifestNames.begin();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800216
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700217 for (auto& initialName : validInitialManifestNames) {
218 // starting from the initial segment
219 auto& validName = initialName;
220 if (manifests.end() == manifest_it) {
221 break;
222 }
223 auto fileName = manifest_it->file_name();
224 // sequential collect all valid segments
225 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
226 output.push_back(*manifest_it);
227 if (manifest_it->submanifest_ptr() != nullptr) {
228 validName = *manifest_it->submanifest_ptr();
229 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800230 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700231 else {
232 ++manifest_it;
233 break;
234 }
235 }
236 // skip the remain segments for this file (all invalid)
237 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
238 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800239 }
240 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700241 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800242}
243
244static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700245initializeDataPackets(const string& filePath,
246 const FileManifest manifest,
247 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800248{
249 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700250 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800251
252 packets = packetize_file(filePath,
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700253 manifest.name(),
254 manifest.data_packet_size(),
255 subManifestSize,
256 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800257
258 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800259 // Filter out invalid packet names
260 std::remove_if(packets.begin(), packets.end(),
261 [&packets, &catalog](const Data& p) {
262 return catalog.end() == std::find(catalog.begin(),
263 catalog.end(),
264 p.getFullName());
265 });
266 return packets;
267}
268
Mickey Sweattafda1f12016-04-04 17:15:11 -0700269static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700270initializeFileState(const string& dataPath,
271 const FileManifest& manifest,
272 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700273{
274 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700275 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700276 auto filePath = dataPath + fileName;
277 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700278 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
279 if (!fs::exists(filePath)) {
280 fs::ofstream fs(filePath);
281 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700282 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700283 auto s = std::make_shared<fs::fstream>(filePath,
284 fs::fstream::out
285 | fs::fstream::binary
286 | fs::fstream::in);
287 if (!*s) {
288 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
289 }
290 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
291 s->seekg(start_offset);
292 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700293 return std::make_pair(s, fileBitMap);
294}
295
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700296//==================================================================================================
297// TorrentManager Implementation
298//==================================================================================================
299
Mickey Sweatt527b0492016-03-02 11:07:48 -0800300void TorrentManager::Initialize()
301{
302 // .../<torrent_name>/torrent-file/<implicit_digest>
303 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
304 string manifestPath = dataPath +"/manifests";
305 string torrentFilePath = dataPath +"/torrent_files";
306
307 // get the torrent file segments and manifests that we have.
308 if (!fs::exists(torrentFilePath)) {
309 return;
310 }
311 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
312 if (m_torrentSegments.empty()) {
313 return;
314 }
315 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700316
317 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800318 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700319 if (m.submanifest_number() == 0) {
320 auto manifestFileName = m.file_name();
321 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800322 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700323 }
324
325 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800326 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700327 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700328 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800329 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700330 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700331 if (!fs::exists(filePath.parent_path())) {
332 boost::filesystem::create_directories(filePath.parent_path());
333 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700334 continue;
335 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700336 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800337 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700338 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
339 m,
340 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700341 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800342 auto read_it = packets.begin();
343 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700344 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700345 if (read_it == packets.end()) {
346 break;
347 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800348 if (name == read_it->getFullName()) {
349 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700350 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800351 }
352 ++i;
353 }
354 for (const auto& d : packets) {
355 seed(d);
356 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800357 }
358 }
359 for (const auto& t : m_torrentSegments) {
360 seed(t);
361 }
362 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700363 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800364 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700365}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800366
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700367std::vector<Name>
368TorrentManager::downloadTorrentFile(const std::string& path)
369{
370 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
371 auto manifestNames = make_shared<std::vector<Name>>();
372 if (searchRes == nullptr) {
373 this->findFileManifestsToDownload(*manifestNames);
374 if (manifestNames->empty()) {
375 auto packetNames = make_shared<std::vector<Name>>();
376 this->findAllMissingDataPackets(*packetNames);
377 return *packetNames;
378 }
379 else {
380 return *manifestNames;
381 }
382 }
383 this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
384 false, {}, {});
385 return *manifestNames;
386}
387
388void
389TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
390 const std::string& path,
391 std::shared_ptr<std::vector<Name>> manifestNames,
392 bool async,
393 TorrentFileReceivedCallback onSuccess,
394 FailedCallback onFailed)
395{
396 shared_ptr<Interest> interest = createInterest(name);
397
398 auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
399 (const Interest& interest, const Data& data) {
400 // Stats Table update here...
401 m_stats_table_iter->incrementReceivedData();
402 m_retries = 0;
403
404 if (async) {
405 manifestNames->clear();
406 }
407
408 TorrentFile file(data.wireEncode());
409
410 // Write the torrent file segment to disk...
411 if (writeTorrentSegment(file, path)) {
412 // if successfully written, seed this data
413 seed(data);
414 }
415
416 const std::vector<Name>& manifestCatalog = file.getCatalog();
417 manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
418
419 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
420
421 if (async) {
422 onSuccess(*manifestNames);
423 }
424 if (nextSegmentPtr != nullptr) {
425 this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
426 async, onSuccess, onFailed);
427 }
428 };
429
430 auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
431 (const Interest& interest) {
432 ++m_retries;
433 if (m_retries >= MAX_NUM_OF_RETRIES) {
434 ++m_stats_table_iter;
435 if (m_stats_table_iter == m_statsTable.end()) {
436 m_stats_table_iter = m_statsTable.begin();
437 }
438 }
439 if (async) {
440 onFailed(interest.getName(), "Unknown error");
441 }
442 this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
443 };
444
445 m_face->expressInterest(*interest, dataReceived, dataFailed);
446
447 if (!async) {
448 m_face->processEvents();
449 }
450}
451
452void
453TorrentManager::downloadTorrentFile(const std::string& path,
454 TorrentFileReceivedCallback onSuccess,
455 FailedCallback onFailed)
456{
457 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
458 auto manifestNames = make_shared<std::vector<Name>>();
459 if (searchRes == nullptr) {
460 this->findFileManifestsToDownload(*manifestNames);
461 if (manifestNames->empty()) {
462 auto packetNames = make_shared<std::vector<Name>>();
463 this->findAllMissingDataPackets(*packetNames);
464 onSuccess(*packetNames);
465 return;
466 }
467 else {
468 onSuccess(*manifestNames);
469 return;
470 }
471 }
472 this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
473 true, onSuccess, onFailed);
474}
475
476void
477TorrentManager::download_file_manifest(const Name& manifestName,
478 const std::string& path,
479 TorrentManager::ManifestReceivedCallback onSuccess,
480 TorrentManager::FailedCallback onFailed)
481{
482 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
483 auto packetNames = make_shared<std::vector<Name>>();
484 if (searchRes == nullptr) {
485 this->findDataPacketsToDownload(manifestName, *packetNames);
486 onSuccess(*packetNames);
487 return;
488 }
489 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
490}
491
492void
493TorrentManager::download_data_packet(const Name& packetName,
494 DataReceivedCallback onSuccess,
495 FailedCallback onFailed)
496{
497 if (this->dataAlreadyDownloaded(packetName)) {
498 onSuccess(packetName);
499 return;
500 }
501
502 shared_ptr<Interest> interest = this->createInterest(packetName);
503
504 auto dataReceived = [onSuccess, onFailed, this]
505 (const Interest& interest, const Data& data) {
506 // Write data to disk...
507 if(writeData(data)) {
508 seed(data);
509 }
510
511 // Stats Table update here...
512 m_stats_table_iter->incrementReceivedData();
513 m_retries = 0;
514 onSuccess(data.getName());
515 };
516 auto dataFailed = [onFailed, this]
517 (const Interest& interest) {
518 m_retries++;
519 if (m_retries >= MAX_NUM_OF_RETRIES) {
520 m_stats_table_iter++;
521 if (m_stats_table_iter == m_statsTable.end())
522 m_stats_table_iter = m_statsTable.begin();
523 }
524 onFailed(interest.getName(), "Unknown failure");
525 };
526
527 m_face->expressInterest(*interest, dataReceived, dataFailed);
528}
529
530void TorrentManager::seed(const Data& data) {
531 m_face->setInterestFilter(data.getFullName(),
532 bind(&TorrentManager::onInterestReceived, this, _1, _2),
533 RegisterPrefixSuccessCallback(),
534 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
535}
536
537// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
538// Protected Helpers
539// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
540
Mickey Sweattafda1f12016-04-04 17:15:11 -0700541bool TorrentManager::writeData(const Data& packet)
542{
543 // find correct manifest
544 const auto& packetName = packet.getName();
545 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
546 [&packetName](const FileManifest& m) {
547 return m.getName().isPrefixOf(packetName);
548 });
549 if (m_fileManifests.end() == manifest_it) {
550 return false;
551 }
552 // get file state out
553 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700554
Mickey Sweattafda1f12016-04-04 17:15:11 -0700555 // if there is no open stream to the file
556 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700557 fs::path filePath = m_dataPath + manifest_it->file_name();
558 if (!fs::exists(filePath)) {
559 fs::create_directories(filePath.parent_path());
560 }
561
562 fileState = initializeFileState(m_dataPath,
563 *manifest_it,
564 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700565 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700566 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700567 // if we already have the packet, do not rewrite it.
568 if (fileState.second[packetNum]) {
569 return false;
570 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700571 auto dataPacketSize = manifest_it->data_packet_size();
572 auto initial_offset = manifest_it->submanifest_number() * m_subManifestSizes[manifest_it->file_name()] * dataPacketSize;
573 auto packetOffset = initial_offset + packetNum * dataPacketSize;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700574 // write data to disk
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700575 fileState.first->seekp(packetOffset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700576 try {
577 auto content = packet.getContent();
578 std::vector<char> data(content.value_begin(), content.value_end());
579 fileState.first->write(&data[0], data.size());
580 }
581 catch (io::Error &e) {
582 std::cerr << e.what() << std::endl;
583 return false;
584 }
585 // update bitmap
586 fileState.second[packetNum] = true;
587 return true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800588}
589
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700590bool TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
591{
592 // validate that this torrent segment belongs to our torrent
593 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
594 if (!torrentPrefix.isPrefixOf(segment.getName())) {
595 return false;
596 }
597
598 auto segmentNum = segment.getSegmentNumber();
599 // check if we already have it
600 if (m_torrentSegments.end() != std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
601 segment))
602 {
603 return false;
604 }
605 // write to disk at path
606 if (!fs::exists(path)) {
607 fs::create_directories(path);
608 }
609 auto filename = path + to_string(segmentNum);
610 // if there is already a file on disk for this torrent segment, determine if we should override
611 if (fs::exists(filename)) {
612 auto segmentOnDisk_ptr = io::load<TorrentFile>(filename);
613 if (nullptr != segmentOnDisk_ptr && *segmentOnDisk_ptr == segment) {
614 return false;
615 }
616 }
617 io::save(segment, filename);
618 // add to collection
619 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
620 [segmentNum](const TorrentFile& t){
621 return t.getSegmentNumber() > segmentNum;
622 });
623 m_torrentSegments.insert(it, segment);
624 return true;
625}
626
627bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
628{
629 auto subManifestNum = manifest.submanifest_number();
630 fs::path filename = path + manifest.file_name() + "/" + to_string(subManifestNum);
631 // check if we already have it
632 if (m_fileManifests.end() != std::find(m_fileManifests.begin(), m_fileManifests.end(),
633 manifest))
634 {
635 return false;
636 }
637
638 // write to disk at path
639 if (!fs::exists(filename.parent_path())) {
640 boost::filesystem::create_directories(filename.parent_path());
641 }
642 // if there is already a file on disk for this torrent segment, determine if we should override
643 if (fs::exists(filename)) {
644 auto submanifestOnDisk_ptr = io::load<FileManifest>(filename.string());
645 if (nullptr != submanifestOnDisk_ptr && *submanifestOnDisk_ptr == manifest) {
646 return false;
647 }
648 }
649 io::save(manifest, filename.string());
650 // add to collection
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700651 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
652 [&manifest](const FileManifest& m){
653 return m.file_name() > manifest.file_name()
654 || (m.file_name() == manifest.file_name()
655 && (m.submanifest_number() > manifest.submanifest_number()));
656 });
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700657 // update the state of the manager
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700658 m_fileManifests.insert(it, manifest);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700659 if (0 == manifest.submanifest_number()) {
660 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
661 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700662 return true;
663}
664
spirosmastorakisa46eee42016-04-05 14:24:45 -0700665void
666TorrentManager::downloadFileManifestSegment(const Name& manifestName,
667 const std::string& path,
668 std::shared_ptr<std::vector<Name>> packetNames,
669 TorrentManager::ManifestReceivedCallback onSuccess,
670 TorrentManager::FailedCallback onFailed)
671{
672 shared_ptr<Interest> interest = this->createInterest(manifestName);
673
674 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
675 (const Interest& interest, const Data& data) {
676 // Stats Table update here...
677 m_stats_table_iter->incrementReceivedData();
678 m_retries = 0;
679
680 FileManifest file(data.wireEncode());
681
682 // Write the file manifest segment to disk...
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700683 if( writeFileManifest(file, path)) {
684 seed(file);
685 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700686
687 const std::vector<Name>& packetsCatalog = file.catalog();
688 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
689 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
690 if (nextSegmentPtr != nullptr) {
691 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
692 }
693 else
694 onSuccess(*packetNames);
695 };
696
697 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
698 (const Interest& interest) {
699 m_retries++;
700 if (m_retries >= MAX_NUM_OF_RETRIES) {
701 m_stats_table_iter++;
702 if (m_stats_table_iter == m_statsTable.end())
703 m_stats_table_iter = m_statsTable.begin();
704 }
705 onFailed(interest.getName(), "Unknown failure");
706 };
707
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700708 m_face->expressInterest(*interest, dataReceived, dataFailed);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700709}
710
711void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700712TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700713{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700714 // handle if it is a torrent-file
715 const auto& interestName = interest.getName();
716 std::shared_ptr<Data> data = nullptr;
717 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
718
719 // determine if it is torrent file (that we have)
720 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
721 if (m_torrentSegments.end() != torrent_it) {
722 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700723 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700724 else {
725 // determine if it is manifest (that we have)
726 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
727 if (m_fileManifests.end() != manifest_it) {
728 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700729 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700730 else {
731 // determine if it is data packet (that we have)
732 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
733 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
734 [&manifestName](const std::pair<Name,
735 std::pair<std::shared_ptr<fs::fstream>,
736 std::vector<bool>>>& kv){
737 return manifestName.isPrefixOf(kv.first);
738 });
739 if (m_fileStates.end() != map_it) {
740 auto packetName = interestName.getSubName(0, interestName.size() - 1);
741 // get out the bitmap to be sure we have the packet
742 auto& fileState = map_it->second;
743 const auto &bitmap = fileState.second;
744 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
745 if (bitmap[packetNum]) {
746 // get the manifest
747 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
748 [&manifestName](const FileManifest& m) {
749 return manifestName.isPrefixOf(m.name());
750 });
751 auto manifestFileName = manifest_it->file_name();
752 auto filePath = m_dataPath + manifestFileName;
753 // TODO(msweatt) Explore why fileState stream does not work
754 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
755 data = readDataPacket(interestName,
756 *manifest_it,
757 m_subManifestSizes[manifestFileName],
758 is);
759 }
760 }
761 }
762 }
763 if (nullptr != data) {
764 m_face->put(*data);
765 }
766 else {
767 // TODO(msweatt) NACK
768 std::cerr << "NACK: " << interest << std::endl;
769 }
770 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700771}
772
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700773void
774TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700775{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700776 std::cerr << "ERROR: Failed to register prefix \""
777 << prefix << "\" in local hub's daemon (" << reason << ")"
778 << std::endl;
779 m_face->shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700780}
781
spirosmastorakis50642f82016-04-08 12:11:18 -0700782shared_ptr<Name>
783TorrentManager::findTorrentFileSegmentToDownload()
784{
785 // if we have no segments
786 if (m_torrentSegments.empty()) {
787 return make_shared<Name>(m_torrentFileName);
788 }
789 // otherwise just return the next segment ptr of the last segment we have
790 return m_torrentSegments.back().getTorrentFilePtr();
791}
792
793shared_ptr<Name>
794TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
795{
796 //sequentially find whether we have downloaded any segments of this manifest file
797 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
798 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
799 [&manifestPrefix] (const FileManifest& f) {
800 return manifestPrefix.isPrefixOf(f.getName());
801 });
802
803 // if we do not have any segments of the file manifest
804 if (it == m_fileManifests.rend()) {
805 return make_shared<Name>(manifestName);
806 }
807
808 // if we already have the requested segment of the file manifest
809 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
810 return it->submanifest_ptr();
811 }
812 // if we do not have the requested segment
813 else {
814 return make_shared<Name>(manifestName);
815 }
816}
817
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700818void
819TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
820{
821 std::vector<Name> manifests;
822 // insert the first segment name of all the file manifests to the vector
823 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
824 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
825 }
826 // for each file
827 for (const auto& manifestName : manifests) {
828 // find the first (if any) segment we are missing
829 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
830 if (nullptr != manifestSegmentName) {
831 manifestNames.push_back(*manifestSegmentName);
832 }
833 }
834}
835
spirosmastorakis50642f82016-04-08 12:11:18 -0700836bool
837TorrentManager::dataAlreadyDownloaded(const Name& dataName)
838{
839
840 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
841 [&dataName](const FileManifest& m) {
842 return m.getName().isPrefixOf(dataName);
843 });
844
845 // if we do not have the file manifest, just return false
846 if (manifest_it == m_fileManifests.end()) {
847 return false;
848 }
849
850 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
851 // that corresponds to the specific submanifest
852 auto& fileState = m_fileStates[manifest_it->getFullName()];
853
854 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
855
856 // find whether we have the requested packet from the bitmap
857 return fileState.second[dataNum];
858}
859
860void
spirosmastorakis50642f82016-04-08 12:11:18 -0700861TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
862{
863 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
864 [&manifestName](const FileManifest& m) {
865 return m.name().getSubName(0, m.name().size()
866 - 1).isPrefixOf(manifestName);
867 });
868
869 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
870 auto& fileState = m_fileStates[j->getFullName()];
871 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
872 if (!fileState.second[dataNum]) {
873 packetNames.push_back(j->catalog()[dataNum]);
874 }
875 }
876
877 // check that the next manifest in the vector refers to the next segment of the same file
878 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
879 break;
880 }
881 }
882}
883
884void
885TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
886{
887 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); j++) {
888 auto& fileState = m_fileStates[j->getFullName()];
889 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
890 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
891 if (!fileState.second[dataNum]) {
892 packetNames.push_back(*i);
893 }
894 }
895 }
896}
897
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700898shared_ptr<Interest>
899TorrentManager::createInterest(Name name)
900{
901 shared_ptr<Interest> interest = make_shared<Interest>(name);
902 interest->setInterestLifetime(time::milliseconds(2000));
903 interest->setMustBeFresh(true);
904
905 // Select routable prefix
906 Link link(name, { {1, m_stats_table_iter->getRecordName()} });
907 m_keyChain->sign(link, signingWithSha256());
908 Block linkWire = link.wireEncode();
909
910 // Stats Table update here...
911 m_stats_table_iter->incrementSentInterests();
912
913 m_sortingCounter++;
914 if (m_sortingCounter >= SORTING_INTERVAL) {
915 m_sortingCounter = 0;
916 m_statsTable.sort();
917 m_stats_table_iter = m_statsTable.begin();
918 m_retries = 0;
919 }
920
921 interest->setLink(linkWire);
922
923 return interest;
924}
925
Mickey Sweatt527b0492016-03-02 11:07:48 -0800926} // end ntorrent
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700927} // end ndn