blob: 790e166a5894db7d16f5c54e2e61f4400efc6aaf [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -07005#include "util/io-util.hpp"
Mickey Sweatt527b0492016-03-02 11:07:48 -08006
7#include <boost/filesystem.hpp>
8#include <boost/filesystem/fstream.hpp>
9
10#include <ndn-cxx/data.hpp>
11#include <ndn-cxx/security/key-chain.hpp>
12#include <ndn-cxx/security/signing-helpers.hpp>
13#include <ndn-cxx/util/io.hpp>
14
15#include <set>
16#include <string>
17#include <unordered_map>
18#include <vector>
19
20namespace fs = boost::filesystem;
21
22using std::string;
23using std::vector;
24
Mickey Sweatt527b0492016-03-02 11:07:48 -080025namespace ndn {
26namespace ntorrent {
27
Mickey Sweatt527b0492016-03-02 11:07:48 -080028static vector<TorrentFile>
29intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
30{
31 security::KeyChain key_chain;
32 Name currSegmentFullName = initialSegmentName;
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070033 vector<TorrentFile> torrentSegments = IoUtil::load_directory<TorrentFile>(torrentFilePath);
34
Mickey Sweatt527b0492016-03-02 11:07:48 -080035 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
36 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
37 TorrentFile& segment = *it;
38 key_chain.sign(segment, signingWithSha256());
39 if (segment.getFullName() != currSegmentFullName) {
40 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
41 torrentSegments.swap(correctSegments);
42 break;
43 }
44 // load the next full name
45 if (nullptr == segment.getTorrentFilePtr()) {
46 break;
47 }
48 currSegmentFullName = *segment.getTorrentFilePtr();
49 }
50 return torrentSegments;
51}
52
53static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -070054intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -080055{
56 security::KeyChain key_chain;
57
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070058 vector<FileManifest> manifests = IoUtil::load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -070059 if (manifests.empty()) {
60 return manifests;
61 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070062
Mickey Sweatt527b0492016-03-02 11:07:48 -080063 // sign the manifests
64 std::for_each(manifests.begin(), manifests.end(),
65 [&key_chain](FileManifest& m){
66 key_chain.sign(m,signingWithSha256());
67 });
68
Mickey Sweatte908a5c2016-04-08 14:10:45 -070069 // put all names of initial manifests from the valid torrent files into a set
70 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080071 for (const auto& segment : torrentSegments) {
72 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -070073 validInitialManifestNames.insert(validInitialManifestNames.end(),
74 catalog.begin(),
75 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -080076 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070077 auto manifest_it = manifests.begin();
78 std::vector<FileManifest> output;
79 output.reserve(manifests.size());
Mickey Sweatt527b0492016-03-02 11:07:48 -080080
Mickey Sweatte908a5c2016-04-08 14:10:45 -070081 for (auto& initialName : validInitialManifestNames) {
82 // starting from the initial segment
83 auto& validName = initialName;
84 if (manifests.end() == manifest_it) {
85 break;
86 }
87 auto fileName = manifest_it->file_name();
88 // sequential collect all valid segments
89 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
90 output.push_back(*manifest_it);
91 if (manifest_it->submanifest_ptr() != nullptr) {
92 validName = *manifest_it->submanifest_ptr();
93 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -080094 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070095 else {
96 ++manifest_it;
97 break;
98 }
99 }
100 // skip the remain segments for this file (all invalid)
101 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
102 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800103 }
104 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700105 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800106}
107
108static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700109initializeDataPackets(const string& filePath,
110 const FileManifest manifest,
111 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800112{
113 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700114 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800115
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700116 packets = IoUtil::packetize_file(filePath,
117 manifest.name(),
118 manifest.data_packet_size(),
119 subManifestSize,
120 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800121
122 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800123 // Filter out invalid packet names
124 std::remove_if(packets.begin(), packets.end(),
125 [&packets, &catalog](const Data& p) {
126 return catalog.end() == std::find(catalog.begin(),
127 catalog.end(),
128 p.getFullName());
129 });
130 return packets;
131}
132
Mickey Sweattafda1f12016-04-04 17:15:11 -0700133static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700134initializeFileState(const string& dataPath,
135 const FileManifest& manifest,
136 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700137{
138 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700139 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700140 auto filePath = dataPath + fileName;
141 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700142 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
143 if (!fs::exists(filePath)) {
144 fs::ofstream fs(filePath);
145 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700146 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700147 auto s = std::make_shared<fs::fstream>(filePath,
148 fs::fstream::out
149 | fs::fstream::binary
150 | fs::fstream::in);
151 if (!*s) {
152 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
153 }
154 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
155 s->seekg(start_offset);
156 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700157 return std::make_pair(s, fileBitMap);
158}
159
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700160//==================================================================================================
161// TorrentManager Implementation
162//==================================================================================================
163
Mickey Sweatt527b0492016-03-02 11:07:48 -0800164void TorrentManager::Initialize()
165{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700166 // initialize the update handler
167
168 // figure out the name of the torrent
169 Name torrentName;
170 if (m_torrentFileName.get(m_torrentFileName.size() - 2).isSequenceNumber()) {
171 torrentName = m_torrentFileName.getSubName(1, m_torrentFileName.size() - 4);
172 }
173 else {
174 torrentName = m_torrentFileName.getSubName(1, m_torrentFileName.size() - 3);
175 }
176
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700177 // m_updateHandler = make_shared<UpdateHandler>(torrentName, m_keyChain,
178 // make_shared<StatsTable>(m_statsTable), m_face);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700179
Mickey Sweatt527b0492016-03-02 11:07:48 -0800180 // .../<torrent_name>/torrent-file/<implicit_digest>
181 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
182 string manifestPath = dataPath +"/manifests";
183 string torrentFilePath = dataPath +"/torrent_files";
184
185 // get the torrent file segments and manifests that we have.
186 if (!fs::exists(torrentFilePath)) {
187 return;
188 }
189 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
190 if (m_torrentSegments.empty()) {
191 return;
192 }
193 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700194
195 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800196 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700197 if (m.submanifest_number() == 0) {
198 auto manifestFileName = m.file_name();
199 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800200 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700201 }
202
203 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800204 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700205 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700206 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800207 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700208 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700209 if (!fs::exists(filePath.parent_path())) {
210 boost::filesystem::create_directories(filePath.parent_path());
211 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700212 continue;
213 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700214 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800215 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700216 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
217 m,
218 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700219 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800220 auto read_it = packets.begin();
221 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700222 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700223 if (read_it == packets.end()) {
224 break;
225 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800226 if (name == read_it->getFullName()) {
227 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700228 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800229 }
230 ++i;
231 }
232 for (const auto& d : packets) {
233 seed(d);
234 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800235 }
236 }
237 for (const auto& t : m_torrentSegments) {
238 seed(t);
239 }
240 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700241 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800242 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700243}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800244
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700245std::vector<Name>
246TorrentManager::downloadTorrentFile(const std::string& path)
247{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700248 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700249 // if (m_updateHandler->needsUpdate()) {
250 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
251 // }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700252 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
253 auto manifestNames = make_shared<std::vector<Name>>();
254 if (searchRes == nullptr) {
255 this->findFileManifestsToDownload(*manifestNames);
256 if (manifestNames->empty()) {
257 auto packetNames = make_shared<std::vector<Name>>();
258 this->findAllMissingDataPackets(*packetNames);
259 return *packetNames;
260 }
261 else {
262 return *manifestNames;
263 }
264 }
265 this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
266 false, {}, {});
267 return *manifestNames;
268}
269
270void
271TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
272 const std::string& path,
273 std::shared_ptr<std::vector<Name>> manifestNames,
274 bool async,
275 TorrentFileReceivedCallback onSuccess,
276 FailedCallback onFailed)
277{
278 shared_ptr<Interest> interest = createInterest(name);
279
280 auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
281 (const Interest& interest, const Data& data) {
282 // Stats Table update here...
283 m_stats_table_iter->incrementReceivedData();
284 m_retries = 0;
285
286 if (async) {
287 manifestNames->clear();
288 }
289
290 TorrentFile file(data.wireEncode());
291
292 // Write the torrent file segment to disk...
293 if (writeTorrentSegment(file, path)) {
294 // if successfully written, seed this data
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700295 seed(file);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700296 }
297
298 const std::vector<Name>& manifestCatalog = file.getCatalog();
299 manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
300
301 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
302
303 if (async) {
304 onSuccess(*manifestNames);
305 }
306 if (nextSegmentPtr != nullptr) {
307 this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
308 async, onSuccess, onFailed);
309 }
310 };
311
312 auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
313 (const Interest& interest) {
314 ++m_retries;
315 if (m_retries >= MAX_NUM_OF_RETRIES) {
316 ++m_stats_table_iter;
317 if (m_stats_table_iter == m_statsTable.end()) {
318 m_stats_table_iter = m_statsTable.begin();
319 }
320 }
321 if (async) {
322 onFailed(interest.getName(), "Unknown error");
323 }
324 this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
325 };
326
327 m_face->expressInterest(*interest, dataReceived, dataFailed);
328
329 if (!async) {
330 m_face->processEvents();
331 }
332}
333
334void
335TorrentManager::downloadTorrentFile(const std::string& path,
336 TorrentFileReceivedCallback onSuccess,
337 FailedCallback onFailed)
338{
339 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
340 auto manifestNames = make_shared<std::vector<Name>>();
341 if (searchRes == nullptr) {
342 this->findFileManifestsToDownload(*manifestNames);
343 if (manifestNames->empty()) {
344 auto packetNames = make_shared<std::vector<Name>>();
345 this->findAllMissingDataPackets(*packetNames);
346 onSuccess(*packetNames);
347 return;
348 }
349 else {
350 onSuccess(*manifestNames);
351 return;
352 }
353 }
354 this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
355 true, onSuccess, onFailed);
356}
357
358void
359TorrentManager::download_file_manifest(const Name& manifestName,
360 const std::string& path,
361 TorrentManager::ManifestReceivedCallback onSuccess,
362 TorrentManager::FailedCallback onFailed)
363{
364 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
365 auto packetNames = make_shared<std::vector<Name>>();
366 if (searchRes == nullptr) {
367 this->findDataPacketsToDownload(manifestName, *packetNames);
368 onSuccess(*packetNames);
369 return;
370 }
371 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
372}
373
374void
375TorrentManager::download_data_packet(const Name& packetName,
376 DataReceivedCallback onSuccess,
377 FailedCallback onFailed)
378{
379 if (this->dataAlreadyDownloaded(packetName)) {
380 onSuccess(packetName);
381 return;
382 }
383
384 shared_ptr<Interest> interest = this->createInterest(packetName);
385
386 auto dataReceived = [onSuccess, onFailed, this]
387 (const Interest& interest, const Data& data) {
388 // Write data to disk...
389 if(writeData(data)) {
390 seed(data);
391 }
392
393 // Stats Table update here...
394 m_stats_table_iter->incrementReceivedData();
395 m_retries = 0;
396 onSuccess(data.getName());
397 };
398 auto dataFailed = [onFailed, this]
399 (const Interest& interest) {
400 m_retries++;
401 if (m_retries >= MAX_NUM_OF_RETRIES) {
402 m_stats_table_iter++;
403 if (m_stats_table_iter == m_statsTable.end())
404 m_stats_table_iter = m_statsTable.begin();
405 }
406 onFailed(interest.getName(), "Unknown failure");
407 };
408
409 m_face->expressInterest(*interest, dataReceived, dataFailed);
410}
411
412void TorrentManager::seed(const Data& data) {
413 m_face->setInterestFilter(data.getFullName(),
414 bind(&TorrentManager::onInterestReceived, this, _1, _2),
415 RegisterPrefixSuccessCallback(),
416 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
417}
418
419// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
420// Protected Helpers
421// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
422
Mickey Sweattafda1f12016-04-04 17:15:11 -0700423bool TorrentManager::writeData(const Data& packet)
424{
425 // find correct manifest
426 const auto& packetName = packet.getName();
427 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
428 [&packetName](const FileManifest& m) {
429 return m.getName().isPrefixOf(packetName);
430 });
431 if (m_fileManifests.end() == manifest_it) {
432 return false;
433 }
434 // get file state out
435 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700436
Mickey Sweattafda1f12016-04-04 17:15:11 -0700437 // if there is no open stream to the file
438 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700439 fs::path filePath = m_dataPath + manifest_it->file_name();
440 if (!fs::exists(filePath)) {
441 fs::create_directories(filePath.parent_path());
442 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700443 fileState = initializeFileState(m_dataPath,
444 *manifest_it,
445 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700446 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700447 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700448 // if we already have the packet, do not rewrite it.
449 if (fileState.second[packetNum]) {
450 return false;
451 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700452 // write data to disk
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700453 // TODO(msweatt) Fix this once code is merged
454 auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
455 if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
456 // update bitmap
457 fileState.second[packetNum] = true;
458 return true;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700459 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700460 return false;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800461}
462
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700463bool
464TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700465{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700466 // validate the torrent
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700467 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700468 // check if we already have it
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700469 if (torrentPrefix.isPrefixOf(segment.getName()) &&
470 m_torrentSegments.end() == std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700471 segment))
472 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700473 if(IoUtil::writeTorrentSegment(segment, path)) {
474 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
475 [&segment](const TorrentFile& t){
476 return segment.getSegmentNumber() < t.getSegmentNumber() ;
477 });
478 m_torrentSegments.insert(it, segment);
479 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700480 }
481 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700482 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700483}
484
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700485
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700486bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
487{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700488 if (m_fileManifests.end() == std::find(m_fileManifests.begin(), m_fileManifests.end(),
489 manifest))
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700490 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700491 // update the state of the manager
492 if (0 == manifest.submanifest_number()) {
493 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
494 }
495 if(IoUtil::writeFileManifest(manifest, path)) {
496 // add to collection
497 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
498 [&manifest](const FileManifest& m){
499 return m.file_name() > manifest.file_name()
500 || (m.file_name() == manifest.file_name()
501 && (m.submanifest_number() > manifest.submanifest_number()));
502 });
503 m_fileManifests.insert(it, manifest);
504 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700505 }
506 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700507 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700508}
509
spirosmastorakisa46eee42016-04-05 14:24:45 -0700510void
511TorrentManager::downloadFileManifestSegment(const Name& manifestName,
512 const std::string& path,
513 std::shared_ptr<std::vector<Name>> packetNames,
514 TorrentManager::ManifestReceivedCallback onSuccess,
515 TorrentManager::FailedCallback onFailed)
516{
517 shared_ptr<Interest> interest = this->createInterest(manifestName);
518
519 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
520 (const Interest& interest, const Data& data) {
521 // Stats Table update here...
522 m_stats_table_iter->incrementReceivedData();
523 m_retries = 0;
524
525 FileManifest file(data.wireEncode());
526
527 // Write the file manifest segment to disk...
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700528 if(writeFileManifest(file, path)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700529 seed(file);
530 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700531 else {
532 onFailed(interest.getName(), "Write Failed");
533 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700534
535 const std::vector<Name>& packetsCatalog = file.catalog();
536 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
537 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
538 if (nextSegmentPtr != nullptr) {
539 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
540 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700541 else {
spirosmastorakisa46eee42016-04-05 14:24:45 -0700542 onSuccess(*packetNames);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700543 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700544 };
545
546 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
547 (const Interest& interest) {
548 m_retries++;
549 if (m_retries >= MAX_NUM_OF_RETRIES) {
550 m_stats_table_iter++;
551 if (m_stats_table_iter == m_statsTable.end())
552 m_stats_table_iter = m_statsTable.begin();
553 }
554 onFailed(interest.getName(), "Unknown failure");
555 };
556
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700557 m_face->expressInterest(*interest, dataReceived, dataFailed);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700558}
559
560void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700561TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700562{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700563 // handle if it is a torrent-file
564 const auto& interestName = interest.getName();
565 std::shared_ptr<Data> data = nullptr;
566 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700567 // determine if it is torrent file (that we have)
568 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
569 if (m_torrentSegments.end() != torrent_it) {
570 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700571 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700572 else {
573 // determine if it is manifest (that we have)
574 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
575 if (m_fileManifests.end() != manifest_it) {
576 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700577 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700578 else {
579 // determine if it is data packet (that we have)
580 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
581 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
582 [&manifestName](const std::pair<Name,
583 std::pair<std::shared_ptr<fs::fstream>,
584 std::vector<bool>>>& kv){
585 return manifestName.isPrefixOf(kv.first);
586 });
587 if (m_fileStates.end() != map_it) {
588 auto packetName = interestName.getSubName(0, interestName.size() - 1);
589 // get out the bitmap to be sure we have the packet
590 auto& fileState = map_it->second;
591 const auto &bitmap = fileState.second;
592 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
593 if (bitmap[packetNum]) {
594 // get the manifest
595 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
596 [&manifestName](const FileManifest& m) {
597 return manifestName.isPrefixOf(m.name());
598 });
599 auto manifestFileName = manifest_it->file_name();
600 auto filePath = m_dataPath + manifestFileName;
601 // TODO(msweatt) Explore why fileState stream does not work
602 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700603 data = IoUtil::readDataPacket(interestName,
604 *manifest_it,
605 m_subManifestSizes[manifestFileName],
606 is);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700607 }
608 }
609 }
610 }
611 if (nullptr != data) {
612 m_face->put(*data);
613 }
614 else {
615 // TODO(msweatt) NACK
616 std::cerr << "NACK: " << interest << std::endl;
617 }
618 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700619}
620
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700621void
622TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700623{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700624 std::cerr << "ERROR: Failed to register prefix \""
625 << prefix << "\" in local hub's daemon (" << reason << ")"
626 << std::endl;
627 m_face->shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700628}
629
spirosmastorakis50642f82016-04-08 12:11:18 -0700630shared_ptr<Name>
631TorrentManager::findTorrentFileSegmentToDownload()
632{
633 // if we have no segments
634 if (m_torrentSegments.empty()) {
635 return make_shared<Name>(m_torrentFileName);
636 }
637 // otherwise just return the next segment ptr of the last segment we have
638 return m_torrentSegments.back().getTorrentFilePtr();
639}
640
641shared_ptr<Name>
642TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
643{
644 //sequentially find whether we have downloaded any segments of this manifest file
645 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
646 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
647 [&manifestPrefix] (const FileManifest& f) {
648 return manifestPrefix.isPrefixOf(f.getName());
649 });
650
651 // if we do not have any segments of the file manifest
652 if (it == m_fileManifests.rend()) {
653 return make_shared<Name>(manifestName);
654 }
655
656 // if we already have the requested segment of the file manifest
657 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
658 return it->submanifest_ptr();
659 }
660 // if we do not have the requested segment
661 else {
662 return make_shared<Name>(manifestName);
663 }
664}
665
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700666void
667TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
668{
669 std::vector<Name> manifests;
670 // insert the first segment name of all the file manifests to the vector
671 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
672 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
673 }
674 // for each file
675 for (const auto& manifestName : manifests) {
676 // find the first (if any) segment we are missing
677 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
678 if (nullptr != manifestSegmentName) {
679 manifestNames.push_back(*manifestSegmentName);
680 }
681 }
682}
683
spirosmastorakis50642f82016-04-08 12:11:18 -0700684bool
685TorrentManager::dataAlreadyDownloaded(const Name& dataName)
686{
687
688 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
689 [&dataName](const FileManifest& m) {
690 return m.getName().isPrefixOf(dataName);
691 });
692
693 // if we do not have the file manifest, just return false
694 if (manifest_it == m_fileManifests.end()) {
695 return false;
696 }
697
698 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
699 // that corresponds to the specific submanifest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700700 auto fileState_it = m_fileStates.find(manifest_it->getFullName());
701 if (m_fileStates.end() != fileState_it) {
702 const auto& fileState = fileState_it->second;
703 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
704 // find whether we have the requested packet from the bitmap
705 return fileState.second[dataNum];
706 }
707 return false;
spirosmastorakis50642f82016-04-08 12:11:18 -0700708}
709
710void
spirosmastorakis50642f82016-04-08 12:11:18 -0700711TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
712{
713 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
714 [&manifestName](const FileManifest& m) {
715 return m.name().getSubName(0, m.name().size()
716 - 1).isPrefixOf(manifestName);
717 });
718
719 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
720 auto& fileState = m_fileStates[j->getFullName()];
721 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
722 if (!fileState.second[dataNum]) {
723 packetNames.push_back(j->catalog()[dataNum]);
724 }
725 }
726
727 // check that the next manifest in the vector refers to the next segment of the same file
728 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
729 break;
730 }
731 }
732}
733
734void
735TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
736{
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700737 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
738 auto fileState_it = m_fileStates.find(j->getFullName());
739 // if we have no packets from this file
740 if (m_fileStates.end() == fileState_it) {
741 packetNames.reserve(packetNames.size() + j->catalog().size());
742 packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
743 }
744 // find the packets that we are missing
745 else {
746 const auto &fileState = fileState_it->second;
747 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
748 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
749 if (!fileState.second[dataNum]) {
750 packetNames.push_back(*i);
751 }
spirosmastorakis50642f82016-04-08 12:11:18 -0700752 }
753 }
754 }
755}
756
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700757shared_ptr<Interest>
758TorrentManager::createInterest(Name name)
759{
760 shared_ptr<Interest> interest = make_shared<Interest>(name);
761 interest->setInterestLifetime(time::milliseconds(2000));
762 interest->setMustBeFresh(true);
763
764 // Select routable prefix
765 Link link(name, { {1, m_stats_table_iter->getRecordName()} });
766 m_keyChain->sign(link, signingWithSha256());
767 Block linkWire = link.wireEncode();
768
769 // Stats Table update here...
770 m_stats_table_iter->incrementSentInterests();
771
772 m_sortingCounter++;
773 if (m_sortingCounter >= SORTING_INTERVAL) {
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700774 // Use the sorting interval to send out "ALIVE" Interests as well
775 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700776 // if (m_updateHandler->needsUpdate()) {
777 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
778 // }
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700779 // Do the actual sorting related stuff
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700780 m_sortingCounter = 0;
781 m_statsTable.sort();
782 m_stats_table_iter = m_statsTable.begin();
783 m_retries = 0;
784 }
785
786 interest->setLink(linkWire);
787
788 return interest;
789}
790
Mickey Sweatt527b0492016-03-02 11:07:48 -0800791} // end ntorrent
spirosmastorakisfd334462016-04-18 15:48:31 -0700792} // end ndn