blob: cb90908a57ad90da3a15b19b50662831691e7784 [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -07005#include "util/io-util.hpp"
Mickey Sweatt527b0492016-03-02 11:07:48 -08006
7#include <boost/filesystem.hpp>
8#include <boost/filesystem/fstream.hpp>
9
10#include <ndn-cxx/data.hpp>
11#include <ndn-cxx/security/key-chain.hpp>
12#include <ndn-cxx/security/signing-helpers.hpp>
13#include <ndn-cxx/util/io.hpp>
14
15#include <set>
16#include <string>
17#include <unordered_map>
18#include <vector>
19
20namespace fs = boost::filesystem;
21
22using std::string;
23using std::vector;
24
Mickey Sweatt527b0492016-03-02 11:07:48 -080025namespace ndn {
26namespace ntorrent {
27
Mickey Sweatt527b0492016-03-02 11:07:48 -080028static vector<TorrentFile>
29intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
30{
31 security::KeyChain key_chain;
32 Name currSegmentFullName = initialSegmentName;
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070033 vector<TorrentFile> torrentSegments = IoUtil::load_directory<TorrentFile>(torrentFilePath);
34
Mickey Sweatt527b0492016-03-02 11:07:48 -080035 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
36 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
37 TorrentFile& segment = *it;
38 key_chain.sign(segment, signingWithSha256());
39 if (segment.getFullName() != currSegmentFullName) {
40 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
41 torrentSegments.swap(correctSegments);
42 break;
43 }
44 // load the next full name
45 if (nullptr == segment.getTorrentFilePtr()) {
46 break;
47 }
48 currSegmentFullName = *segment.getTorrentFilePtr();
49 }
50 return torrentSegments;
51}
52
53static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -070054intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -080055{
56 security::KeyChain key_chain;
57
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070058 vector<FileManifest> manifests = IoUtil::load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -070059 if (manifests.empty()) {
60 return manifests;
61 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070062
Mickey Sweatt527b0492016-03-02 11:07:48 -080063 // sign the manifests
64 std::for_each(manifests.begin(), manifests.end(),
65 [&key_chain](FileManifest& m){
66 key_chain.sign(m,signingWithSha256());
67 });
68
Mickey Sweatte908a5c2016-04-08 14:10:45 -070069 // put all names of initial manifests from the valid torrent files into a set
70 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080071 for (const auto& segment : torrentSegments) {
72 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -070073 validInitialManifestNames.insert(validInitialManifestNames.end(),
74 catalog.begin(),
75 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -080076 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070077 auto manifest_it = manifests.begin();
78 std::vector<FileManifest> output;
79 output.reserve(manifests.size());
80 auto validIvalidInitialManifestNames_it = validInitialManifestNames.begin();
Mickey Sweatt527b0492016-03-02 11:07:48 -080081
Mickey Sweatte908a5c2016-04-08 14:10:45 -070082 for (auto& initialName : validInitialManifestNames) {
83 // starting from the initial segment
84 auto& validName = initialName;
85 if (manifests.end() == manifest_it) {
86 break;
87 }
88 auto fileName = manifest_it->file_name();
89 // sequential collect all valid segments
90 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
91 output.push_back(*manifest_it);
92 if (manifest_it->submanifest_ptr() != nullptr) {
93 validName = *manifest_it->submanifest_ptr();
94 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -080095 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070096 else {
97 ++manifest_it;
98 break;
99 }
100 }
101 // skip the remain segments for this file (all invalid)
102 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
103 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800104 }
105 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700106 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800107}
108
109static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700110initializeDataPackets(const string& filePath,
111 const FileManifest manifest,
112 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800113{
114 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700115 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800116
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700117 packets = IoUtil::packetize_file(filePath,
118 manifest.name(),
119 manifest.data_packet_size(),
120 subManifestSize,
121 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800122
123 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800124 // Filter out invalid packet names
125 std::remove_if(packets.begin(), packets.end(),
126 [&packets, &catalog](const Data& p) {
127 return catalog.end() == std::find(catalog.begin(),
128 catalog.end(),
129 p.getFullName());
130 });
131 return packets;
132}
133
Mickey Sweattafda1f12016-04-04 17:15:11 -0700134static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700135initializeFileState(const string& dataPath,
136 const FileManifest& manifest,
137 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700138{
139 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700140 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700141 auto filePath = dataPath + fileName;
142 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700143 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
144 if (!fs::exists(filePath)) {
145 fs::ofstream fs(filePath);
146 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700147 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700148 auto s = std::make_shared<fs::fstream>(filePath,
149 fs::fstream::out
150 | fs::fstream::binary
151 | fs::fstream::in);
152 if (!*s) {
153 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
154 }
155 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
156 s->seekg(start_offset);
157 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700158 return std::make_pair(s, fileBitMap);
159}
160
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700161//==================================================================================================
162// TorrentManager Implementation
163//==================================================================================================
164
Mickey Sweatt527b0492016-03-02 11:07:48 -0800165void TorrentManager::Initialize()
166{
167 // .../<torrent_name>/torrent-file/<implicit_digest>
168 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
169 string manifestPath = dataPath +"/manifests";
170 string torrentFilePath = dataPath +"/torrent_files";
171
172 // get the torrent file segments and manifests that we have.
173 if (!fs::exists(torrentFilePath)) {
174 return;
175 }
176 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
177 if (m_torrentSegments.empty()) {
178 return;
179 }
180 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700181
182 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800183 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700184 if (m.submanifest_number() == 0) {
185 auto manifestFileName = m.file_name();
186 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800187 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700188 }
189
190 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800191 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700192 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700193 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800194 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700195 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700196 if (!fs::exists(filePath.parent_path())) {
197 boost::filesystem::create_directories(filePath.parent_path());
198 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700199 continue;
200 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700201 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800202 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700203 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
204 m,
205 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700206 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800207 auto read_it = packets.begin();
208 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700209 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700210 if (read_it == packets.end()) {
211 break;
212 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800213 if (name == read_it->getFullName()) {
214 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700215 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800216 }
217 ++i;
218 }
219 for (const auto& d : packets) {
220 seed(d);
221 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800222 }
223 }
224 for (const auto& t : m_torrentSegments) {
225 seed(t);
226 }
227 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700228 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800229 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700230}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800231
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700232std::vector<Name>
233TorrentManager::downloadTorrentFile(const std::string& path)
234{
235 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
236 auto manifestNames = make_shared<std::vector<Name>>();
237 if (searchRes == nullptr) {
238 this->findFileManifestsToDownload(*manifestNames);
239 if (manifestNames->empty()) {
240 auto packetNames = make_shared<std::vector<Name>>();
241 this->findAllMissingDataPackets(*packetNames);
242 return *packetNames;
243 }
244 else {
245 return *manifestNames;
246 }
247 }
248 this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
249 false, {}, {});
250 return *manifestNames;
251}
252
253void
254TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
255 const std::string& path,
256 std::shared_ptr<std::vector<Name>> manifestNames,
257 bool async,
258 TorrentFileReceivedCallback onSuccess,
259 FailedCallback onFailed)
260{
261 shared_ptr<Interest> interest = createInterest(name);
262
263 auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
264 (const Interest& interest, const Data& data) {
265 // Stats Table update here...
266 m_stats_table_iter->incrementReceivedData();
267 m_retries = 0;
268
269 if (async) {
270 manifestNames->clear();
271 }
272
273 TorrentFile file(data.wireEncode());
274
275 // Write the torrent file segment to disk...
276 if (writeTorrentSegment(file, path)) {
277 // if successfully written, seed this data
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700278 seed(file);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700279 }
280
281 const std::vector<Name>& manifestCatalog = file.getCatalog();
282 manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
283
284 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
285
286 if (async) {
287 onSuccess(*manifestNames);
288 }
289 if (nextSegmentPtr != nullptr) {
290 this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
291 async, onSuccess, onFailed);
292 }
293 };
294
295 auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
296 (const Interest& interest) {
297 ++m_retries;
298 if (m_retries >= MAX_NUM_OF_RETRIES) {
299 ++m_stats_table_iter;
300 if (m_stats_table_iter == m_statsTable.end()) {
301 m_stats_table_iter = m_statsTable.begin();
302 }
303 }
304 if (async) {
305 onFailed(interest.getName(), "Unknown error");
306 }
307 this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
308 };
309
310 m_face->expressInterest(*interest, dataReceived, dataFailed);
311
312 if (!async) {
313 m_face->processEvents();
314 }
315}
316
317void
318TorrentManager::downloadTorrentFile(const std::string& path,
319 TorrentFileReceivedCallback onSuccess,
320 FailedCallback onFailed)
321{
322 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
323 auto manifestNames = make_shared<std::vector<Name>>();
324 if (searchRes == nullptr) {
325 this->findFileManifestsToDownload(*manifestNames);
326 if (manifestNames->empty()) {
327 auto packetNames = make_shared<std::vector<Name>>();
328 this->findAllMissingDataPackets(*packetNames);
329 onSuccess(*packetNames);
330 return;
331 }
332 else {
333 onSuccess(*manifestNames);
334 return;
335 }
336 }
337 this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
338 true, onSuccess, onFailed);
339}
340
341void
342TorrentManager::download_file_manifest(const Name& manifestName,
343 const std::string& path,
344 TorrentManager::ManifestReceivedCallback onSuccess,
345 TorrentManager::FailedCallback onFailed)
346{
347 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
348 auto packetNames = make_shared<std::vector<Name>>();
349 if (searchRes == nullptr) {
350 this->findDataPacketsToDownload(manifestName, *packetNames);
351 onSuccess(*packetNames);
352 return;
353 }
354 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
355}
356
357void
358TorrentManager::download_data_packet(const Name& packetName,
359 DataReceivedCallback onSuccess,
360 FailedCallback onFailed)
361{
362 if (this->dataAlreadyDownloaded(packetName)) {
363 onSuccess(packetName);
364 return;
365 }
366
367 shared_ptr<Interest> interest = this->createInterest(packetName);
368
369 auto dataReceived = [onSuccess, onFailed, this]
370 (const Interest& interest, const Data& data) {
371 // Write data to disk...
372 if(writeData(data)) {
373 seed(data);
374 }
375
376 // Stats Table update here...
377 m_stats_table_iter->incrementReceivedData();
378 m_retries = 0;
379 onSuccess(data.getName());
380 };
381 auto dataFailed = [onFailed, this]
382 (const Interest& interest) {
383 m_retries++;
384 if (m_retries >= MAX_NUM_OF_RETRIES) {
385 m_stats_table_iter++;
386 if (m_stats_table_iter == m_statsTable.end())
387 m_stats_table_iter = m_statsTable.begin();
388 }
389 onFailed(interest.getName(), "Unknown failure");
390 };
391
392 m_face->expressInterest(*interest, dataReceived, dataFailed);
393}
394
395void TorrentManager::seed(const Data& data) {
396 m_face->setInterestFilter(data.getFullName(),
397 bind(&TorrentManager::onInterestReceived, this, _1, _2),
398 RegisterPrefixSuccessCallback(),
399 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
400}
401
402// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
403// Protected Helpers
404// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
405
Mickey Sweattafda1f12016-04-04 17:15:11 -0700406bool TorrentManager::writeData(const Data& packet)
407{
408 // find correct manifest
409 const auto& packetName = packet.getName();
410 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
411 [&packetName](const FileManifest& m) {
412 return m.getName().isPrefixOf(packetName);
413 });
414 if (m_fileManifests.end() == manifest_it) {
415 return false;
416 }
417 // get file state out
418 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700419
Mickey Sweattafda1f12016-04-04 17:15:11 -0700420 // if there is no open stream to the file
421 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700422 fs::path filePath = m_dataPath + manifest_it->file_name();
423 if (!fs::exists(filePath)) {
424 fs::create_directories(filePath.parent_path());
425 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700426 fileState = initializeFileState(m_dataPath,
427 *manifest_it,
428 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700429 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700430 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700431 // if we already have the packet, do not rewrite it.
432 if (fileState.second[packetNum]) {
433 return false;
434 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700435 // write data to disk
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700436 // TODO(msweatt) Fix this once code is merged
437 auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
438 if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
439 // update bitmap
440 fileState.second[packetNum] = true;
441 return true;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700442 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700443 return false;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800444}
445
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700446bool
447TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700448{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700449 // validate the torrent
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700450 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700451 // check if we already have it
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700452 if (torrentPrefix.isPrefixOf(segment.getName()) &&
453 m_torrentSegments.end() == std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700454 segment))
455 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700456 if(IoUtil::writeTorrentSegment(segment, path)) {
457 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
458 [&segment](const TorrentFile& t){
459 return segment.getSegmentNumber() < t.getSegmentNumber() ;
460 });
461 m_torrentSegments.insert(it, segment);
462 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700463 }
464 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700465 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700466}
467
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700468
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700469bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
470{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700471 if (m_fileManifests.end() == std::find(m_fileManifests.begin(), m_fileManifests.end(),
472 manifest))
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700473 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700474 // update the state of the manager
475 if (0 == manifest.submanifest_number()) {
476 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
477 }
478 if(IoUtil::writeFileManifest(manifest, path)) {
479 // add to collection
480 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
481 [&manifest](const FileManifest& m){
482 return m.file_name() > manifest.file_name()
483 || (m.file_name() == manifest.file_name()
484 && (m.submanifest_number() > manifest.submanifest_number()));
485 });
486 m_fileManifests.insert(it, manifest);
487 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700488 }
489 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700490 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700491}
492
spirosmastorakisa46eee42016-04-05 14:24:45 -0700493void
494TorrentManager::downloadFileManifestSegment(const Name& manifestName,
495 const std::string& path,
496 std::shared_ptr<std::vector<Name>> packetNames,
497 TorrentManager::ManifestReceivedCallback onSuccess,
498 TorrentManager::FailedCallback onFailed)
499{
500 shared_ptr<Interest> interest = this->createInterest(manifestName);
501
502 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
503 (const Interest& interest, const Data& data) {
504 // Stats Table update here...
505 m_stats_table_iter->incrementReceivedData();
506 m_retries = 0;
507
508 FileManifest file(data.wireEncode());
509
510 // Write the file manifest segment to disk...
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700511 if(writeFileManifest(file, path)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700512 seed(file);
513 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700514 else {
515 onFailed(interest.getName(), "Write Failed");
516 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700517
518 const std::vector<Name>& packetsCatalog = file.catalog();
519 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
520 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
521 if (nextSegmentPtr != nullptr) {
522 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
523 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700524 else {
spirosmastorakisa46eee42016-04-05 14:24:45 -0700525 onSuccess(*packetNames);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700526 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700527 };
528
529 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
530 (const Interest& interest) {
531 m_retries++;
532 if (m_retries >= MAX_NUM_OF_RETRIES) {
533 m_stats_table_iter++;
534 if (m_stats_table_iter == m_statsTable.end())
535 m_stats_table_iter = m_statsTable.begin();
536 }
537 onFailed(interest.getName(), "Unknown failure");
538 };
539
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700540 m_face->expressInterest(*interest, dataReceived, dataFailed);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700541}
542
543void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700544TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700545{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700546 // handle if it is a torrent-file
547 const auto& interestName = interest.getName();
548 std::shared_ptr<Data> data = nullptr;
549 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
550
551 // determine if it is torrent file (that we have)
552 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
553 if (m_torrentSegments.end() != torrent_it) {
554 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700555 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700556 else {
557 // determine if it is manifest (that we have)
558 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
559 if (m_fileManifests.end() != manifest_it) {
560 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700561 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700562 else {
563 // determine if it is data packet (that we have)
564 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
565 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
566 [&manifestName](const std::pair<Name,
567 std::pair<std::shared_ptr<fs::fstream>,
568 std::vector<bool>>>& kv){
569 return manifestName.isPrefixOf(kv.first);
570 });
571 if (m_fileStates.end() != map_it) {
572 auto packetName = interestName.getSubName(0, interestName.size() - 1);
573 // get out the bitmap to be sure we have the packet
574 auto& fileState = map_it->second;
575 const auto &bitmap = fileState.second;
576 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
577 if (bitmap[packetNum]) {
578 // get the manifest
579 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
580 [&manifestName](const FileManifest& m) {
581 return manifestName.isPrefixOf(m.name());
582 });
583 auto manifestFileName = manifest_it->file_name();
584 auto filePath = m_dataPath + manifestFileName;
585 // TODO(msweatt) Explore why fileState stream does not work
586 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700587 data = IoUtil::readDataPacket(interestName,
588 *manifest_it,
589 m_subManifestSizes[manifestFileName],
590 is);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700591 }
592 }
593 }
594 }
595 if (nullptr != data) {
596 m_face->put(*data);
597 }
598 else {
599 // TODO(msweatt) NACK
600 std::cerr << "NACK: " << interest << std::endl;
601 }
602 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700603}
604
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700605void
606TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700607{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700608 std::cerr << "ERROR: Failed to register prefix \""
609 << prefix << "\" in local hub's daemon (" << reason << ")"
610 << std::endl;
611 m_face->shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700612}
613
spirosmastorakis50642f82016-04-08 12:11:18 -0700614shared_ptr<Name>
615TorrentManager::findTorrentFileSegmentToDownload()
616{
617 // if we have no segments
618 if (m_torrentSegments.empty()) {
619 return make_shared<Name>(m_torrentFileName);
620 }
621 // otherwise just return the next segment ptr of the last segment we have
622 return m_torrentSegments.back().getTorrentFilePtr();
623}
624
625shared_ptr<Name>
626TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
627{
628 //sequentially find whether we have downloaded any segments of this manifest file
629 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
630 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
631 [&manifestPrefix] (const FileManifest& f) {
632 return manifestPrefix.isPrefixOf(f.getName());
633 });
634
635 // if we do not have any segments of the file manifest
636 if (it == m_fileManifests.rend()) {
637 return make_shared<Name>(manifestName);
638 }
639
640 // if we already have the requested segment of the file manifest
641 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
642 return it->submanifest_ptr();
643 }
644 // if we do not have the requested segment
645 else {
646 return make_shared<Name>(manifestName);
647 }
648}
649
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700650void
651TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
652{
653 std::vector<Name> manifests;
654 // insert the first segment name of all the file manifests to the vector
655 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
656 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
657 }
658 // for each file
659 for (const auto& manifestName : manifests) {
660 // find the first (if any) segment we are missing
661 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
662 if (nullptr != manifestSegmentName) {
663 manifestNames.push_back(*manifestSegmentName);
664 }
665 }
666}
667
spirosmastorakis50642f82016-04-08 12:11:18 -0700668bool
669TorrentManager::dataAlreadyDownloaded(const Name& dataName)
670{
671
672 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
673 [&dataName](const FileManifest& m) {
674 return m.getName().isPrefixOf(dataName);
675 });
676
677 // if we do not have the file manifest, just return false
678 if (manifest_it == m_fileManifests.end()) {
679 return false;
680 }
681
682 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
683 // that corresponds to the specific submanifest
684 auto& fileState = m_fileStates[manifest_it->getFullName()];
685
686 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
687
688 // find whether we have the requested packet from the bitmap
689 return fileState.second[dataNum];
690}
691
692void
spirosmastorakis50642f82016-04-08 12:11:18 -0700693TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
694{
695 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
696 [&manifestName](const FileManifest& m) {
697 return m.name().getSubName(0, m.name().size()
698 - 1).isPrefixOf(manifestName);
699 });
700
701 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
702 auto& fileState = m_fileStates[j->getFullName()];
703 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
704 if (!fileState.second[dataNum]) {
705 packetNames.push_back(j->catalog()[dataNum]);
706 }
707 }
708
709 // check that the next manifest in the vector refers to the next segment of the same file
710 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
711 break;
712 }
713 }
714}
715
716void
717TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
718{
719 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); j++) {
720 auto& fileState = m_fileStates[j->getFullName()];
721 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
722 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
723 if (!fileState.second[dataNum]) {
724 packetNames.push_back(*i);
725 }
726 }
727 }
728}
729
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700730shared_ptr<Interest>
731TorrentManager::createInterest(Name name)
732{
733 shared_ptr<Interest> interest = make_shared<Interest>(name);
734 interest->setInterestLifetime(time::milliseconds(2000));
735 interest->setMustBeFresh(true);
736
737 // Select routable prefix
738 Link link(name, { {1, m_stats_table_iter->getRecordName()} });
739 m_keyChain->sign(link, signingWithSha256());
740 Block linkWire = link.wireEncode();
741
742 // Stats Table update here...
743 m_stats_table_iter->incrementSentInterests();
744
745 m_sortingCounter++;
746 if (m_sortingCounter >= SORTING_INTERVAL) {
747 m_sortingCounter = 0;
748 m_statsTable.sort();
749 m_stats_table_iter = m_statsTable.begin();
750 m_retries = 0;
751 }
752
753 interest->setLink(linkWire);
754
755 return interest;
756}
757
Mickey Sweatt527b0492016-03-02 11:07:48 -0800758} // end ntorrent
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700759} // end ndn