blob: 4a0eaffe1dd414b543d645a6101695d6046c823a [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
5
6#include <boost/filesystem.hpp>
7#include <boost/filesystem/fstream.hpp>
8
9#include <ndn-cxx/data.hpp>
10#include <ndn-cxx/security/key-chain.hpp>
11#include <ndn-cxx/security/signing-helpers.hpp>
12#include <ndn-cxx/util/io.hpp>
13
14#include <set>
15#include <string>
16#include <unordered_map>
17#include <vector>
18
19namespace fs = boost::filesystem;
20
21using std::string;
22using std::vector;
23
24namespace {
25// TODO(msweatt) Move this to a utility
26template<typename T>
27static vector<T>
28load_directory(const string& dirPath,
29 ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
30 vector<T> structures;
Mickey Sweatt599bfef2016-04-05 19:11:20 -070031 std::set<string> fileNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080032 if (fs::exists(dirPath)) {
Mickey Sweatt599bfef2016-04-05 19:11:20 -070033 for(fs::recursive_directory_iterator it(dirPath);
34 it != fs::recursive_directory_iterator();
Mickey Sweatt527b0492016-03-02 11:07:48 -080035 ++it)
36 {
Mickey Sweatt599bfef2016-04-05 19:11:20 -070037 fileNames.insert(it->path().string());
38 }
39 for (const auto& f : fileNames) {
40 auto data_ptr = ndn::io::load<T>(f, encoding);
Mickey Sweatt527b0492016-03-02 11:07:48 -080041 if (nullptr != data_ptr) {
42 structures.push_back(*data_ptr);
43 }
44 }
45 }
46 structures.shrink_to_fit();
47 return structures;
48}
49
50} // end anonymous
51
52namespace ndn {
53namespace ntorrent {
54
55// TODO(msweatt) Move this to a utility
56static vector<ndn::Data>
57packetize_file(const fs::path& filePath,
58 const ndn::Name& commonPrefix,
59 size_t dataPacketSize,
60 size_t subManifestSize,
61 size_t subManifestNum)
62{
63 BOOST_ASSERT(0 < dataPacketSize);
64 size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
65 auto file_size = fs::file_size(filePath);
66 auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
67 // determine the number of bytes in this submanifest
68 auto subManifestLength = subManifestSize * dataPacketSize;
69 auto remainingFileLength = file_size - start_offset;
70 subManifestLength = remainingFileLength < subManifestLength
71 ? remainingFileLength
72 : subManifestLength;
73 vector<ndn::Data> packets;
74 packets.reserve(subManifestLength/dataPacketSize + 1);
75 fs::ifstream fs(filePath, fs::ifstream::binary);
76 if (!fs) {
77 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
78 }
79 // ensure that buffer is large enough to contain whole packets
80 // buffer size is either the entire file or the smallest number of data packets >= 2 GB
81 auto buffer_size =
82 subManifestLength < APPROX_BUFFER_SIZE ?
83 subManifestLength :
84 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
85 APPROX_BUFFER_SIZE :
86 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
87 vector<char> file_bytes;
88 file_bytes.reserve(buffer_size);
89 size_t bytes_read = 0;
90 fs.seekg(start_offset);
91 while(fs && bytes_read < subManifestLength && !fs.eof()) {
92 // read the file into the buffer
93 fs.read(&file_bytes.front(), buffer_size);
94 auto read_size = fs.gcount();
95 if (fs.bad() || read_size < 0) {
96 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
97 }
98 bytes_read += read_size;
99 char *curr_start = &file_bytes.front();
100 for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
101 // Build a packet from the data
102 Name packetName = commonPrefix;
103 packetName.appendSequenceNumber(packets.size());
104 Data d(packetName);
105 auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
106 d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
107 curr_start += content_length;
108 // append to the collection
109 packets.push_back(d);
110 }
111 file_bytes.clear();
112 // recompute the buffer_size
113 buffer_size =
114 subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
115 subManifestLength - bytes_read :
116 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
117 APPROX_BUFFER_SIZE :
118 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
119 }
120 fs.close();
121 packets.shrink_to_fit();
122 ndn::security::KeyChain key_chain;
123 // sign all the packets
124 for (auto& p : packets) {
125 key_chain.sign(p, signingWithSha256());
126 }
127 return packets;
128}
129
130static vector<TorrentFile>
131intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
132{
133 security::KeyChain key_chain;
134 Name currSegmentFullName = initialSegmentName;
135 vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700136
Mickey Sweatt527b0492016-03-02 11:07:48 -0800137 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
138 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
139 TorrentFile& segment = *it;
140 key_chain.sign(segment, signingWithSha256());
141 if (segment.getFullName() != currSegmentFullName) {
142 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
143 torrentSegments.swap(correctSegments);
144 break;
145 }
146 // load the next full name
147 if (nullptr == segment.getTorrentFilePtr()) {
148 break;
149 }
150 currSegmentFullName = *segment.getTorrentFilePtr();
151 }
152 return torrentSegments;
153}
154
155static vector<FileManifest>
156intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
157{
158 security::KeyChain key_chain;
159
160 vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
161
162 // sign the manifests
163 std::for_each(manifests.begin(), manifests.end(),
164 [&key_chain](FileManifest& m){
165 key_chain.sign(m,signingWithSha256());
166 });
167
168 // put all names of manifests from the valid torrent files into a set
169 std::set<ndn::Name> validManifestNames;
170 for (const auto& segment : torrentSegments) {
171 const auto& catalog = segment.getCatalog();
172 validManifestNames.insert(catalog.begin(), catalog.end());
173 }
174
175 // put all names of file manifests from disk into a set
176 std::set<ndn::Name> loadedManifestNames;
177 std::for_each(manifests.begin(), manifests.end(),
178 [&loadedManifestNames](const FileManifest& m){
179 loadedManifestNames.insert(m.getFullName());
180 });
181
182 // the set of fileManifests that we have is simply the intersection
183 std::set<Name> output;
184 std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
185 loadedManifestNames.begin(), loadedManifestNames.end(),
186 std::inserter(output, output.begin()));
187
188 // filter out those manifests that are not in this set
189 std::remove_if(manifests.begin(),
190 manifests.end(),
191 [&output](const FileManifest& m) {
192 return (output.end() == output.find(m.name()));
193 });
194
195 // order the manifests in the same order they are in the torrent
196 std::vector<Name> catalogNames;
197 for (const auto& segment : torrentSegments) {
198 const auto& catalog = segment.getCatalog();
199 catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
200 }
201 size_t curr_index = 0;
202 for (auto name : catalogNames) {
203 auto it = std::find_if(manifests.begin(), manifests.end(),
204 [&name](const FileManifest& m) {
205 return m.getFullName() == name;
206 });
207 if (it != manifests.end()) {
208 // not already in the correct position
209 if (it != manifests.begin() + curr_index) {
210 std::swap(manifests[curr_index], *it);
211 }
212 ++curr_index;
213 }
214 }
215
216 return manifests;
217}
218
219static vector<Data>
220intializeDataPackets(const string& filePath,
221 const FileManifest manifest,
222 const TorrentFile& torrentFile)
223{
224 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700225 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800226
227 packets = packetize_file(filePath,
228 manifest.name(),
229 manifest.data_packet_size(),
230 manifest.catalog().size(),
231 subManifestNum);
232
233 auto catalog = manifest.catalog();
234
235 // Filter out invalid packet names
236 std::remove_if(packets.begin(), packets.end(),
237 [&packets, &catalog](const Data& p) {
238 return catalog.end() == std::find(catalog.begin(),
239 catalog.end(),
240 p.getFullName());
241 });
242 return packets;
243}
244
Mickey Sweattafda1f12016-04-04 17:15:11 -0700245static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
246initializeFileState(const string& dataPath,
247 const FileManifest& manifest)
248{
249 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700250 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700251 auto filePath = dataPath + fileName;
252 vector<bool> fileBitMap(manifest.catalog().size());
253 auto fbits = fs::fstream::out | fs::fstream::binary;
254 // if file exists, use in O/W use concatenate mode
255 fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
256 auto s = std::make_shared<fs::fstream>(filePath, fbits);
257 if (!*s) {
258 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
259 }
260 return std::make_pair(s, fileBitMap);
261}
262
Mickey Sweatt527b0492016-03-02 11:07:48 -0800263void TorrentManager::Initialize()
264{
265 // .../<torrent_name>/torrent-file/<implicit_digest>
266 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
267 string manifestPath = dataPath +"/manifests";
268 string torrentFilePath = dataPath +"/torrent_files";
269
270 // get the torrent file segments and manifests that we have.
271 if (!fs::exists(torrentFilePath)) {
272 return;
273 }
274 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
275 if (m_torrentSegments.empty()) {
276 return;
277 }
278 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
279 auto currTorrentFile_it = m_torrentSegments.begin();
280 for (const auto& m : m_fileManifests) {
281 // find the appropriate torrent file
282 auto currCatalog = currTorrentFile_it->getCatalog();
283 while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
284 {
285 ++currTorrentFile_it;
286 currCatalog = currTorrentFile_it->getCatalog();
287 }
288 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700289 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700290 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800291 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700292 if (!fs::exists(filePath)) {
293 boost::filesystem::create_directories(filePath.parent_path());
294 continue;
295 }
296 auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800297 if (!packets.empty()) {
Mickey Sweattafda1f12016-04-04 17:15:11 -0700298 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
299 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800300 auto read_it = packets.begin();
301 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700302 for (auto name : m.catalog()) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800303 if (name == read_it->getFullName()) {
304 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700305 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800306 }
307 ++i;
308 }
309 for (const auto& d : packets) {
310 seed(d);
311 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800312 }
313 }
314 for (const auto& t : m_torrentSegments) {
315 seed(t);
316 }
317 for (const auto& m : m_fileManifests) {
318 seed(m);
319 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700320}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800321
Mickey Sweattafda1f12016-04-04 17:15:11 -0700322bool TorrentManager::writeData(const Data& packet)
323{
324 // find correct manifest
325 const auto& packetName = packet.getName();
326 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
327 [&packetName](const FileManifest& m) {
328 return m.getName().isPrefixOf(packetName);
329 });
330 if (m_fileManifests.end() == manifest_it) {
331 return false;
332 }
333 // get file state out
334 auto& fileState = m_fileStates[manifest_it->getFullName()];
335 // if there is no open stream to the file
336 if (nullptr == fileState.first) {
337 fileState = initializeFileState(m_dataPath, *manifest_it);
338 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700339 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700340 // if we already have the packet, do not rewrite it.
341 if (fileState.second[packetNum]) {
342 return false;
343 }
344 auto packetOffset = packetNum * manifest_it->data_packet_size();
345 // write data to disk
346 fileState.first->seekg(packetOffset);
347 try {
348 auto content = packet.getContent();
349 std::vector<char> data(content.value_begin(), content.value_end());
350 fileState.first->write(&data[0], data.size());
351 }
352 catch (io::Error &e) {
353 std::cerr << e.what() << std::endl;
354 return false;
355 }
356 // update bitmap
357 fileState.second[packetNum] = true;
358 return true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800359}
360
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700361bool TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
362{
363 // validate that this torrent segment belongs to our torrent
364 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
365 if (!torrentPrefix.isPrefixOf(segment.getName())) {
366 return false;
367 }
368
369 auto segmentNum = segment.getSegmentNumber();
370 // check if we already have it
371 if (m_torrentSegments.end() != std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
372 segment))
373 {
374 return false;
375 }
376 // write to disk at path
377 if (!fs::exists(path)) {
378 fs::create_directories(path);
379 }
380 auto filename = path + to_string(segmentNum);
381 // if there is already a file on disk for this torrent segment, determine if we should override
382 if (fs::exists(filename)) {
383 auto segmentOnDisk_ptr = io::load<TorrentFile>(filename);
384 if (nullptr != segmentOnDisk_ptr && *segmentOnDisk_ptr == segment) {
385 return false;
386 }
387 }
388 io::save(segment, filename);
389 // add to collection
390 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
391 [segmentNum](const TorrentFile& t){
392 return t.getSegmentNumber() > segmentNum;
393 });
394 m_torrentSegments.insert(it, segment);
395 return true;
396}
397
398bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
399{
400 auto subManifestNum = manifest.submanifest_number();
401 fs::path filename = path + manifest.file_name() + "/" + to_string(subManifestNum);
402 // check if we already have it
403 if (m_fileManifests.end() != std::find(m_fileManifests.begin(), m_fileManifests.end(),
404 manifest))
405 {
406 return false;
407 }
408
409 // write to disk at path
410 if (!fs::exists(filename.parent_path())) {
411 boost::filesystem::create_directories(filename.parent_path());
412 }
413 // if there is already a file on disk for this torrent segment, determine if we should override
414 if (fs::exists(filename)) {
415 auto submanifestOnDisk_ptr = io::load<FileManifest>(filename.string());
416 if (nullptr != submanifestOnDisk_ptr && *submanifestOnDisk_ptr == manifest) {
417 return false;
418 }
419 }
420 io::save(manifest, filename.string());
421 // add to collection
422 // add to collection
423 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
424 [&manifest](const FileManifest& m){
425 return m.file_name() > manifest.file_name()
426 || (m.file_name() == manifest.file_name()
427 && (m.submanifest_number() > manifest.submanifest_number()));
428 });
429 m_fileManifests.insert(it, manifest);
430 return true;
431}
432
Mickey Sweatt527b0492016-03-02 11:07:48 -0800433void TorrentManager::seed(const Data& data) const {
434 // TODO(msweatt) IMPLEMENT ME
435}
436
spirosmastorakisa46eee42016-04-05 14:24:45 -0700437void
438TorrentManager::downloadTorrentFile(const std::string& path,
spirosmastorakis50642f82016-04-08 12:11:18 -0700439 TorrentFileReceivedCallback onSuccess,
spirosmastorakisa46eee42016-04-05 14:24:45 -0700440 FailedCallback onFailed)
441{
spirosmastorakis50642f82016-04-08 12:11:18 -0700442 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700443 auto manifestNames = make_shared<std::vector<Name>>();
spirosmastorakis50642f82016-04-08 12:11:18 -0700444 if (searchRes == nullptr) {
445 this->findFileManifestsToDownload(*manifestNames);
446 if (manifestNames->empty()) {
447 auto packetNames = make_shared<std::vector<Name>>();
448 this->findAllMissingDataPackets(*packetNames);
449 onSuccess(*packetNames);
450 return;
451 }
452 else {
453 onSuccess(*manifestNames);
454 return;
455 }
456 }
457 this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
spirosmastorakisa46eee42016-04-05 14:24:45 -0700458 true, onSuccess, onFailed);
459}
460
461std::vector<Name>
462TorrentManager::downloadTorrentFile(const std::string& path)
463{
spirosmastorakis50642f82016-04-08 12:11:18 -0700464 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700465 auto manifestNames = make_shared<std::vector<Name>>();
spirosmastorakis50642f82016-04-08 12:11:18 -0700466 if (searchRes == nullptr) {
467 this->findFileManifestsToDownload(*manifestNames);
468 if (manifestNames->empty()) {
469 auto packetNames = make_shared<std::vector<Name>>();
470 this->findAllMissingDataPackets(*packetNames);
471 return *packetNames;
472 }
473 else {
474 return *manifestNames;
475 }
476 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700477 this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
478 false, {}, {});
479 return *manifestNames;
480}
481
482void
483TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
484 const std::string& path,
485 std::shared_ptr<std::vector<Name>> manifestNames,
486 bool async,
spirosmastorakis50642f82016-04-08 12:11:18 -0700487 TorrentFileReceivedCallback onSuccess,
spirosmastorakisa46eee42016-04-05 14:24:45 -0700488 FailedCallback onFailed)
489{
490 shared_ptr<Interest> interest = createInterest(name);
491
492 auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
493 (const Interest& interest, const Data& data) {
494 // Stats Table update here...
495 m_stats_table_iter->incrementReceivedData();
496 m_retries = 0;
497
spirosmastorakis50642f82016-04-08 12:11:18 -0700498 if (async) {
499 manifestNames->clear();
500 }
501
spirosmastorakisa46eee42016-04-05 14:24:45 -0700502 TorrentFile file(data.wireEncode());
503
504 // Write the torrent file segment to disk...
505 writeTorrentSegment(file, path);
506
507 const std::vector<Name>& manifestCatalog = file.getCatalog();
508 manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
509
510 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
511
512 if (async) {
spirosmastorakis50642f82016-04-08 12:11:18 -0700513 onSuccess(*manifestNames);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700514 }
515 if (nextSegmentPtr != nullptr) {
516 this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
517 async, onSuccess, onFailed);
518 }
519 };
520
521 auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
522 (const Interest& interest) {
523 ++m_retries;
524 if (m_retries >= MAX_NUM_OF_RETRIES) {
525 ++m_stats_table_iter;
526 if (m_stats_table_iter == m_statsTable.end()) {
527 m_stats_table_iter = m_statsTable.begin();
528 }
529 }
530 if (async) {
531 onFailed(interest.getName(), "Unknown error");
532 }
533 this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
534 };
535
536 m_face.expressInterest(*interest, dataReceived, dataFailed);
537
538 if (!async) {
539 m_face.processEvents();
540 }
541}
542
543
544void
545TorrentManager::download_file_manifest(const Name& manifestName,
546 const std::string& path,
547 TorrentManager::ManifestReceivedCallback onSuccess,
548 TorrentManager::FailedCallback onFailed)
549{
spirosmastorakis50642f82016-04-08 12:11:18 -0700550 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700551 auto packetNames = make_shared<std::vector<Name>>();
spirosmastorakis50642f82016-04-08 12:11:18 -0700552 if (searchRes == nullptr) {
553 this->findDataPacketsToDownload(manifestName, *packetNames);
554 onSuccess(*packetNames);
555 return;
556 }
557 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700558}
559
560void
561TorrentManager::downloadFileManifestSegment(const Name& manifestName,
562 const std::string& path,
563 std::shared_ptr<std::vector<Name>> packetNames,
564 TorrentManager::ManifestReceivedCallback onSuccess,
565 TorrentManager::FailedCallback onFailed)
566{
567 shared_ptr<Interest> interest = this->createInterest(manifestName);
568
569 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
570 (const Interest& interest, const Data& data) {
571 // Stats Table update here...
572 m_stats_table_iter->incrementReceivedData();
573 m_retries = 0;
574
575 FileManifest file(data.wireEncode());
576
577 // Write the file manifest segment to disk...
578 writeFileManifest(file, path);
579
580 const std::vector<Name>& packetsCatalog = file.catalog();
581 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
582 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
583 if (nextSegmentPtr != nullptr) {
584 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
585 }
586 else
587 onSuccess(*packetNames);
588 };
589
590 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
591 (const Interest& interest) {
592 m_retries++;
593 if (m_retries >= MAX_NUM_OF_RETRIES) {
594 m_stats_table_iter++;
595 if (m_stats_table_iter == m_statsTable.end())
596 m_stats_table_iter = m_statsTable.begin();
597 }
598 onFailed(interest.getName(), "Unknown failure");
599 };
600
601 m_face.expressInterest(*interest, dataReceived, dataFailed);
602}
603
604void
605TorrentManager::download_data_packet(const Name& packetName,
606 DataReceivedCallback onSuccess,
607 FailedCallback onFailed)
608{
spirosmastorakis50642f82016-04-08 12:11:18 -0700609 if (this->dataAlreadyDownloaded(packetName)) {
610 onSuccess(packetName);
611 return;
612 }
613
spirosmastorakisa46eee42016-04-05 14:24:45 -0700614 shared_ptr<Interest> interest = this->createInterest(packetName);
615
616 auto dataReceived = [onSuccess, onFailed, this]
617 (const Interest& interest, const Data& data) {
618 // Write data to disk...
619 writeData(data);
620
621 // Stats Table update here...
622 m_stats_table_iter->incrementReceivedData();
623 m_retries = 0;
624 onSuccess(data.getName());
625 };
626 auto dataFailed = [onFailed, this]
627 (const Interest& interest) {
628 m_retries++;
629 if (m_retries >= MAX_NUM_OF_RETRIES) {
630 m_stats_table_iter++;
631 if (m_stats_table_iter == m_statsTable.end())
632 m_stats_table_iter = m_statsTable.begin();
633 }
634 onFailed(interest.getName(), "Unknown failure");
635 };
636
637 m_face.expressInterest(*interest, dataReceived, dataFailed);
638}
639
640shared_ptr<Interest>
641TorrentManager::createInterest(Name name)
642{
643 shared_ptr<Interest> interest = make_shared<Interest>(name);
644 interest->setInterestLifetime(time::milliseconds(2000));
645 interest->setMustBeFresh(true);
646
647 // Select routable prefix
648 Link link(name, { {1, m_stats_table_iter->getRecordName()} });
649 m_keyChain->sign(link, signingWithSha256());
650 Block linkWire = link.wireEncode();
651
652 // Stats Table update here...
653 m_stats_table_iter->incrementSentInterests();
654
655 m_sortingCounter++;
656 if (m_sortingCounter >= SORTING_INTERVAL) {
657 m_sortingCounter = 0;
658 m_statsTable.sort();
659 m_stats_table_iter = m_statsTable.begin();
660 m_retries = 0;
661 }
662
663 interest->setLink(linkWire);
664
665 return interest;
666}
667
spirosmastorakis50642f82016-04-08 12:11:18 -0700668shared_ptr<Name>
669TorrentManager::findTorrentFileSegmentToDownload()
670{
671 // if we have no segments
672 if (m_torrentSegments.empty()) {
673 return make_shared<Name>(m_torrentFileName);
674 }
675 // otherwise just return the next segment ptr of the last segment we have
676 return m_torrentSegments.back().getTorrentFilePtr();
677}
678
679shared_ptr<Name>
680TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
681{
682 //sequentially find whether we have downloaded any segments of this manifest file
683 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
684 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
685 [&manifestPrefix] (const FileManifest& f) {
686 return manifestPrefix.isPrefixOf(f.getName());
687 });
688
689 // if we do not have any segments of the file manifest
690 if (it == m_fileManifests.rend()) {
691 return make_shared<Name>(manifestName);
692 }
693
694 // if we already have the requested segment of the file manifest
695 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
696 return it->submanifest_ptr();
697 }
698 // if we do not have the requested segment
699 else {
700 return make_shared<Name>(manifestName);
701 }
702}
703
704bool
705TorrentManager::dataAlreadyDownloaded(const Name& dataName)
706{
707
708 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
709 [&dataName](const FileManifest& m) {
710 return m.getName().isPrefixOf(dataName);
711 });
712
713 // if we do not have the file manifest, just return false
714 if (manifest_it == m_fileManifests.end()) {
715 return false;
716 }
717
718 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
719 // that corresponds to the specific submanifest
720 auto& fileState = m_fileStates[manifest_it->getFullName()];
721
722 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
723
724 // find whether we have the requested packet from the bitmap
725 return fileState.second[dataNum];
726}
727
728void
729TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
730{
731 std::vector<Name> manifests;
732 // insert the first segment name of all the file manifests to the vector
733 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
734 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
735 }
736 // for each file
737 for (const auto& manifestName : manifests) {
738 // find the first (if any) segment we are missing
739 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
740 if (nullptr != manifestSegmentName) {
741 manifestNames.push_back(*manifestSegmentName);
742 }
743 }
744}
745
746void
747TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
748{
749 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
750 [&manifestName](const FileManifest& m) {
751 return m.name().getSubName(0, m.name().size()
752 - 1).isPrefixOf(manifestName);
753 });
754
755 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
756 auto& fileState = m_fileStates[j->getFullName()];
757 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
758 if (!fileState.second[dataNum]) {
759 packetNames.push_back(j->catalog()[dataNum]);
760 }
761 }
762
763 // check that the next manifest in the vector refers to the next segment of the same file
764 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
765 break;
766 }
767 }
768}
769
770void
771TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
772{
773 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); j++) {
774 auto& fileState = m_fileStates[j->getFullName()];
775 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
776 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
777 if (!fileState.second[dataNum]) {
778 packetNames.push_back(*i);
779 }
780 }
781 }
782}
783
Mickey Sweatt527b0492016-03-02 11:07:48 -0800784} // end ntorrent
spirosmastorakisa46eee42016-04-05 14:24:45 -0700785} // end ndn