blob: c976cb17c4990255322847634273a0b04fbbaef2 [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -07005#include "util/io-util.hpp"
Mickey Sweatt617d2d42016-04-25 22:02:08 -07006#include "util/logging.hpp"
Mickey Sweatt527b0492016-03-02 11:07:48 -08007
8#include <boost/filesystem.hpp>
9#include <boost/filesystem/fstream.hpp>
10
11#include <ndn-cxx/data.hpp>
12#include <ndn-cxx/security/key-chain.hpp>
13#include <ndn-cxx/security/signing-helpers.hpp>
14#include <ndn-cxx/util/io.hpp>
15
16#include <set>
17#include <string>
18#include <unordered_map>
19#include <vector>
20
21namespace fs = boost::filesystem;
22
23using std::string;
24using std::vector;
25
Mickey Sweatt527b0492016-03-02 11:07:48 -080026namespace ndn {
27namespace ntorrent {
28
Mickey Sweatt527b0492016-03-02 11:07:48 -080029static vector<TorrentFile>
30intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
31{
32 security::KeyChain key_chain;
33 Name currSegmentFullName = initialSegmentName;
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070034 vector<TorrentFile> torrentSegments = IoUtil::load_directory<TorrentFile>(torrentFilePath);
35
Mickey Sweatt527b0492016-03-02 11:07:48 -080036 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
37 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
38 TorrentFile& segment = *it;
39 key_chain.sign(segment, signingWithSha256());
40 if (segment.getFullName() != currSegmentFullName) {
41 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
42 torrentSegments.swap(correctSegments);
43 break;
44 }
45 // load the next full name
46 if (nullptr == segment.getTorrentFilePtr()) {
47 break;
48 }
49 currSegmentFullName = *segment.getTorrentFilePtr();
50 }
51 return torrentSegments;
52}
53
54static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -070055intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -080056{
57 security::KeyChain key_chain;
58
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070059 vector<FileManifest> manifests = IoUtil::load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -070060 if (manifests.empty()) {
61 return manifests;
62 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070063
Mickey Sweatt527b0492016-03-02 11:07:48 -080064 // sign the manifests
65 std::for_each(manifests.begin(), manifests.end(),
66 [&key_chain](FileManifest& m){
67 key_chain.sign(m,signingWithSha256());
68 });
69
Mickey Sweatte908a5c2016-04-08 14:10:45 -070070 // put all names of initial manifests from the valid torrent files into a set
71 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080072 for (const auto& segment : torrentSegments) {
73 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -070074 validInitialManifestNames.insert(validInitialManifestNames.end(),
75 catalog.begin(),
76 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -080077 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070078 auto manifest_it = manifests.begin();
79 std::vector<FileManifest> output;
80 output.reserve(manifests.size());
Mickey Sweatt527b0492016-03-02 11:07:48 -080081
Mickey Sweatte908a5c2016-04-08 14:10:45 -070082 for (auto& initialName : validInitialManifestNames) {
83 // starting from the initial segment
84 auto& validName = initialName;
85 if (manifests.end() == manifest_it) {
86 break;
87 }
88 auto fileName = manifest_it->file_name();
89 // sequential collect all valid segments
90 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
91 output.push_back(*manifest_it);
92 if (manifest_it->submanifest_ptr() != nullptr) {
93 validName = *manifest_it->submanifest_ptr();
94 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -080095 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070096 else {
97 ++manifest_it;
98 break;
99 }
100 }
101 // skip the remain segments for this file (all invalid)
102 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
103 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800104 }
105 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700106 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800107}
108
109static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700110initializeDataPackets(const string& filePath,
111 const FileManifest manifest,
112 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800113{
114 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700115 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800116
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700117 packets = IoUtil::packetize_file(filePath,
118 manifest.name(),
119 manifest.data_packet_size(),
120 subManifestSize,
121 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800122
123 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800124 // Filter out invalid packet names
125 std::remove_if(packets.begin(), packets.end(),
126 [&packets, &catalog](const Data& p) {
127 return catalog.end() == std::find(catalog.begin(),
128 catalog.end(),
129 p.getFullName());
130 });
131 return packets;
132}
133
Mickey Sweattafda1f12016-04-04 17:15:11 -0700134static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700135initializeFileState(const string& dataPath,
136 const FileManifest& manifest,
137 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700138{
139 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700140 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700141 auto filePath = dataPath + fileName;
142 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700143 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
144 if (!fs::exists(filePath)) {
145 fs::ofstream fs(filePath);
146 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700147 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700148 auto s = std::make_shared<fs::fstream>(filePath,
149 fs::fstream::out
150 | fs::fstream::binary
151 | fs::fstream::in);
152 if (!*s) {
153 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
154 }
155 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
156 s->seekg(start_offset);
157 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700158 return std::make_pair(s, fileBitMap);
159}
160
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700161//==================================================================================================
162// TorrentManager Implementation
163//==================================================================================================
164
Mickey Sweatt527b0492016-03-02 11:07:48 -0800165void TorrentManager::Initialize()
166{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700167 // initialize the update handler
168
169 // figure out the name of the torrent
170 Name torrentName;
171 if (m_torrentFileName.get(m_torrentFileName.size() - 2).isSequenceNumber()) {
172 torrentName = m_torrentFileName.getSubName(1, m_torrentFileName.size() - 4);
173 }
174 else {
175 torrentName = m_torrentFileName.getSubName(1, m_torrentFileName.size() - 3);
176 }
177
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700178 // m_updateHandler = make_shared<UpdateHandler>(torrentName, m_keyChain,
179 // make_shared<StatsTable>(m_statsTable), m_face);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700180
Mickey Sweatt527b0492016-03-02 11:07:48 -0800181 // .../<torrent_name>/torrent-file/<implicit_digest>
182 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
183 string manifestPath = dataPath +"/manifests";
184 string torrentFilePath = dataPath +"/torrent_files";
185
186 // get the torrent file segments and manifests that we have.
187 if (!fs::exists(torrentFilePath)) {
188 return;
189 }
190 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
191 if (m_torrentSegments.empty()) {
192 return;
193 }
194 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700195
196 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800197 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700198 if (m.submanifest_number() == 0) {
199 auto manifestFileName = m.file_name();
200 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800201 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700202 }
203
204 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800205 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700206 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700207 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800208 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700209 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700210 if (!fs::exists(filePath.parent_path())) {
211 boost::filesystem::create_directories(filePath.parent_path());
212 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700213 continue;
214 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700215 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800216 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700217 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
218 m,
219 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700220 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800221 auto read_it = packets.begin();
222 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700223 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700224 if (read_it == packets.end()) {
225 break;
226 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800227 if (name == read_it->getFullName()) {
228 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700229 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800230 }
231 ++i;
232 }
233 for (const auto& d : packets) {
234 seed(d);
235 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800236 }
237 }
238 for (const auto& t : m_torrentSegments) {
239 seed(t);
240 }
241 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700242 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800243 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700244}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800245
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700246std::vector<Name>
247TorrentManager::downloadTorrentFile(const std::string& path)
248{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700249 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700250 // if (m_updateHandler->needsUpdate()) {
251 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
252 // }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700253 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
254 auto manifestNames = make_shared<std::vector<Name>>();
255 if (searchRes == nullptr) {
256 this->findFileManifestsToDownload(*manifestNames);
257 if (manifestNames->empty()) {
258 auto packetNames = make_shared<std::vector<Name>>();
259 this->findAllMissingDataPackets(*packetNames);
260 return *packetNames;
261 }
262 else {
263 return *manifestNames;
264 }
265 }
266 this->downloadTorrentFileSegment(m_torrentFileName, path, manifestNames,
267 false, {}, {});
268 return *manifestNames;
269}
270
271void
272TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
273 const std::string& path,
274 std::shared_ptr<std::vector<Name>> manifestNames,
275 bool async,
276 TorrentFileReceivedCallback onSuccess,
277 FailedCallback onFailed)
278{
279 shared_ptr<Interest> interest = createInterest(name);
280
281 auto dataReceived = [manifestNames, path, async, onSuccess, onFailed, this]
282 (const Interest& interest, const Data& data) {
283 // Stats Table update here...
284 m_stats_table_iter->incrementReceivedData();
285 m_retries = 0;
286
287 if (async) {
288 manifestNames->clear();
289 }
290
291 TorrentFile file(data.wireEncode());
292
293 // Write the torrent file segment to disk...
294 if (writeTorrentSegment(file, path)) {
295 // if successfully written, seed this data
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700296 seed(file);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700297 }
298
299 const std::vector<Name>& manifestCatalog = file.getCatalog();
300 manifestNames->insert(manifestNames->end(), manifestCatalog.begin(), manifestCatalog.end());
301
302 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
303
304 if (async) {
305 onSuccess(*manifestNames);
306 }
307 if (nextSegmentPtr != nullptr) {
308 this->downloadTorrentFileSegment(*nextSegmentPtr, path, manifestNames,
309 async, onSuccess, onFailed);
310 }
311 };
312
313 auto dataFailed = [manifestNames, path, name, async, onSuccess, onFailed, this]
314 (const Interest& interest) {
315 ++m_retries;
316 if (m_retries >= MAX_NUM_OF_RETRIES) {
317 ++m_stats_table_iter;
318 if (m_stats_table_iter == m_statsTable.end()) {
319 m_stats_table_iter = m_statsTable.begin();
320 }
321 }
322 if (async) {
323 onFailed(interest.getName(), "Unknown error");
324 }
325 this->downloadTorrentFileSegment(name, path, manifestNames, async, onSuccess, onFailed);
326 };
327
328 m_face->expressInterest(*interest, dataReceived, dataFailed);
329
330 if (!async) {
331 m_face->processEvents();
332 }
333}
334
335void
336TorrentManager::downloadTorrentFile(const std::string& path,
337 TorrentFileReceivedCallback onSuccess,
338 FailedCallback onFailed)
339{
340 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
341 auto manifestNames = make_shared<std::vector<Name>>();
342 if (searchRes == nullptr) {
343 this->findFileManifestsToDownload(*manifestNames);
344 if (manifestNames->empty()) {
345 auto packetNames = make_shared<std::vector<Name>>();
346 this->findAllMissingDataPackets(*packetNames);
347 onSuccess(*packetNames);
348 return;
349 }
350 else {
351 onSuccess(*manifestNames);
352 return;
353 }
354 }
355 this->downloadTorrentFileSegment(*searchRes, path, manifestNames,
356 true, onSuccess, onFailed);
357}
358
359void
360TorrentManager::download_file_manifest(const Name& manifestName,
361 const std::string& path,
362 TorrentManager::ManifestReceivedCallback onSuccess,
363 TorrentManager::FailedCallback onFailed)
364{
365 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
366 auto packetNames = make_shared<std::vector<Name>>();
367 if (searchRes == nullptr) {
368 this->findDataPacketsToDownload(manifestName, *packetNames);
369 onSuccess(*packetNames);
370 return;
371 }
372 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
373}
374
375void
376TorrentManager::download_data_packet(const Name& packetName,
377 DataReceivedCallback onSuccess,
378 FailedCallback onFailed)
379{
380 if (this->dataAlreadyDownloaded(packetName)) {
381 onSuccess(packetName);
382 return;
383 }
384
385 shared_ptr<Interest> interest = this->createInterest(packetName);
386
387 auto dataReceived = [onSuccess, onFailed, this]
388 (const Interest& interest, const Data& data) {
389 // Write data to disk...
390 if(writeData(data)) {
391 seed(data);
392 }
393
394 // Stats Table update here...
395 m_stats_table_iter->incrementReceivedData();
396 m_retries = 0;
397 onSuccess(data.getName());
398 };
399 auto dataFailed = [onFailed, this]
400 (const Interest& interest) {
401 m_retries++;
402 if (m_retries >= MAX_NUM_OF_RETRIES) {
403 m_stats_table_iter++;
404 if (m_stats_table_iter == m_statsTable.end())
405 m_stats_table_iter = m_statsTable.begin();
406 }
407 onFailed(interest.getName(), "Unknown failure");
408 };
409
410 m_face->expressInterest(*interest, dataReceived, dataFailed);
411}
412
413void TorrentManager::seed(const Data& data) {
414 m_face->setInterestFilter(data.getFullName(),
415 bind(&TorrentManager::onInterestReceived, this, _1, _2),
416 RegisterPrefixSuccessCallback(),
417 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
418}
419
420// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
421// Protected Helpers
422// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
423
Mickey Sweattafda1f12016-04-04 17:15:11 -0700424bool TorrentManager::writeData(const Data& packet)
425{
426 // find correct manifest
427 const auto& packetName = packet.getName();
428 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
429 [&packetName](const FileManifest& m) {
430 return m.getName().isPrefixOf(packetName);
431 });
432 if (m_fileManifests.end() == manifest_it) {
433 return false;
434 }
435 // get file state out
436 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700437
Mickey Sweattafda1f12016-04-04 17:15:11 -0700438 // if there is no open stream to the file
439 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700440 fs::path filePath = m_dataPath + manifest_it->file_name();
441 if (!fs::exists(filePath)) {
442 fs::create_directories(filePath.parent_path());
443 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700444 fileState = initializeFileState(m_dataPath,
445 *manifest_it,
446 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700447 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700448 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700449 // if we already have the packet, do not rewrite it.
450 if (fileState.second[packetNum]) {
451 return false;
452 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700453 // write data to disk
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700454 // TODO(msweatt) Fix this once code is merged
455 auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
456 if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
457 // update bitmap
458 fileState.second[packetNum] = true;
459 return true;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700460 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700461 return false;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800462}
463
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700464bool
465TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700466{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700467 // validate the torrent
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700468 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700469 // check if we already have it
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700470 if (torrentPrefix.isPrefixOf(segment.getName()) &&
471 m_torrentSegments.end() == std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700472 segment))
473 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700474 if(IoUtil::writeTorrentSegment(segment, path)) {
475 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
476 [&segment](const TorrentFile& t){
477 return segment.getSegmentNumber() < t.getSegmentNumber() ;
478 });
479 m_torrentSegments.insert(it, segment);
480 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700481 }
482 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700483 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700484}
485
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700486
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700487bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
488{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700489 if (m_fileManifests.end() == std::find(m_fileManifests.begin(), m_fileManifests.end(),
490 manifest))
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700491 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700492 // update the state of the manager
493 if (0 == manifest.submanifest_number()) {
494 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
495 }
496 if(IoUtil::writeFileManifest(manifest, path)) {
497 // add to collection
498 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
499 [&manifest](const FileManifest& m){
500 return m.file_name() > manifest.file_name()
501 || (m.file_name() == manifest.file_name()
502 && (m.submanifest_number() > manifest.submanifest_number()));
503 });
504 m_fileManifests.insert(it, manifest);
505 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700506 }
507 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700508 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700509}
510
spirosmastorakisa46eee42016-04-05 14:24:45 -0700511void
512TorrentManager::downloadFileManifestSegment(const Name& manifestName,
513 const std::string& path,
514 std::shared_ptr<std::vector<Name>> packetNames,
515 TorrentManager::ManifestReceivedCallback onSuccess,
516 TorrentManager::FailedCallback onFailed)
517{
518 shared_ptr<Interest> interest = this->createInterest(manifestName);
519
520 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
521 (const Interest& interest, const Data& data) {
522 // Stats Table update here...
523 m_stats_table_iter->incrementReceivedData();
524 m_retries = 0;
525
526 FileManifest file(data.wireEncode());
527
528 // Write the file manifest segment to disk...
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700529 if(writeFileManifest(file, path)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700530 seed(file);
531 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700532 else {
533 onFailed(interest.getName(), "Write Failed");
534 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700535
536 const std::vector<Name>& packetsCatalog = file.catalog();
537 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
538 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
539 if (nextSegmentPtr != nullptr) {
540 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
541 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700542 else {
spirosmastorakisa46eee42016-04-05 14:24:45 -0700543 onSuccess(*packetNames);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700544 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700545 };
546
547 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
548 (const Interest& interest) {
549 m_retries++;
550 if (m_retries >= MAX_NUM_OF_RETRIES) {
551 m_stats_table_iter++;
552 if (m_stats_table_iter == m_statsTable.end())
553 m_stats_table_iter = m_statsTable.begin();
554 }
555 onFailed(interest.getName(), "Unknown failure");
556 };
557
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700558 m_face->expressInterest(*interest, dataReceived, dataFailed);
spirosmastorakisa46eee42016-04-05 14:24:45 -0700559}
560
561void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700562TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700563{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700564 // handle if it is a torrent-file
565 const auto& interestName = interest.getName();
566 std::shared_ptr<Data> data = nullptr;
567 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700568 // determine if it is torrent file (that we have)
569 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
570 if (m_torrentSegments.end() != torrent_it) {
571 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700572 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700573 else {
574 // determine if it is manifest (that we have)
575 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
576 if (m_fileManifests.end() != manifest_it) {
577 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700578 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700579 else {
580 // determine if it is data packet (that we have)
581 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
582 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
583 [&manifestName](const std::pair<Name,
584 std::pair<std::shared_ptr<fs::fstream>,
585 std::vector<bool>>>& kv){
586 return manifestName.isPrefixOf(kv.first);
587 });
588 if (m_fileStates.end() != map_it) {
589 auto packetName = interestName.getSubName(0, interestName.size() - 1);
590 // get out the bitmap to be sure we have the packet
591 auto& fileState = map_it->second;
592 const auto &bitmap = fileState.second;
593 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
594 if (bitmap[packetNum]) {
595 // get the manifest
596 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
597 [&manifestName](const FileManifest& m) {
598 return manifestName.isPrefixOf(m.name());
599 });
600 auto manifestFileName = manifest_it->file_name();
601 auto filePath = m_dataPath + manifestFileName;
602 // TODO(msweatt) Explore why fileState stream does not work
603 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700604 data = IoUtil::readDataPacket(interestName,
605 *manifest_it,
606 m_subManifestSizes[manifestFileName],
607 is);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700608 }
609 }
610 }
611 }
612 if (nullptr != data) {
613 m_face->put(*data);
614 }
615 else {
616 // TODO(msweatt) NACK
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700617 LOG_ERROR << "NACK: " << interest << std::endl;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700618 }
619 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700620}
621
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700622void
623TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700624{
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700625 LOG_ERROR << "ERROR: Failed to register prefix \""
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700626 << prefix << "\" in local hub's daemon (" << reason << ")"
627 << std::endl;
628 m_face->shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700629}
630
spirosmastorakis50642f82016-04-08 12:11:18 -0700631shared_ptr<Name>
632TorrentManager::findTorrentFileSegmentToDownload()
633{
634 // if we have no segments
635 if (m_torrentSegments.empty()) {
636 return make_shared<Name>(m_torrentFileName);
637 }
638 // otherwise just return the next segment ptr of the last segment we have
639 return m_torrentSegments.back().getTorrentFilePtr();
640}
641
642shared_ptr<Name>
643TorrentManager::findManifestSegmentToDownload(const Name& manifestName)
644{
645 //sequentially find whether we have downloaded any segments of this manifest file
646 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
647 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
648 [&manifestPrefix] (const FileManifest& f) {
649 return manifestPrefix.isPrefixOf(f.getName());
650 });
651
652 // if we do not have any segments of the file manifest
653 if (it == m_fileManifests.rend()) {
654 return make_shared<Name>(manifestName);
655 }
656
657 // if we already have the requested segment of the file manifest
658 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
659 return it->submanifest_ptr();
660 }
661 // if we do not have the requested segment
662 else {
663 return make_shared<Name>(manifestName);
664 }
665}
666
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700667void
668TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames)
669{
670 std::vector<Name> manifests;
671 // insert the first segment name of all the file manifests to the vector
672 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
673 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
674 }
675 // for each file
676 for (const auto& manifestName : manifests) {
677 // find the first (if any) segment we are missing
678 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
679 if (nullptr != manifestSegmentName) {
680 manifestNames.push_back(*manifestSegmentName);
681 }
682 }
683}
684
spirosmastorakis50642f82016-04-08 12:11:18 -0700685bool
686TorrentManager::dataAlreadyDownloaded(const Name& dataName)
687{
688
689 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
690 [&dataName](const FileManifest& m) {
691 return m.getName().isPrefixOf(dataName);
692 });
693
694 // if we do not have the file manifest, just return false
695 if (manifest_it == m_fileManifests.end()) {
696 return false;
697 }
698
699 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
700 // that corresponds to the specific submanifest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700701 auto fileState_it = m_fileStates.find(manifest_it->getFullName());
702 if (m_fileStates.end() != fileState_it) {
703 const auto& fileState = fileState_it->second;
704 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
705 // find whether we have the requested packet from the bitmap
706 return fileState.second[dataNum];
707 }
708 return false;
spirosmastorakis50642f82016-04-08 12:11:18 -0700709}
710
711void
spirosmastorakis50642f82016-04-08 12:11:18 -0700712TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames)
713{
714 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
715 [&manifestName](const FileManifest& m) {
716 return m.name().getSubName(0, m.name().size()
717 - 1).isPrefixOf(manifestName);
718 });
719
720 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
721 auto& fileState = m_fileStates[j->getFullName()];
722 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
723 if (!fileState.second[dataNum]) {
724 packetNames.push_back(j->catalog()[dataNum]);
725 }
726 }
727
728 // check that the next manifest in the vector refers to the next segment of the same file
729 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
730 break;
731 }
732 }
733}
734
735void
736TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames)
737{
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700738 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
739 auto fileState_it = m_fileStates.find(j->getFullName());
740 // if we have no packets from this file
741 if (m_fileStates.end() == fileState_it) {
742 packetNames.reserve(packetNames.size() + j->catalog().size());
743 packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
744 }
745 // find the packets that we are missing
746 else {
747 const auto &fileState = fileState_it->second;
748 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
749 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
750 if (!fileState.second[dataNum]) {
751 packetNames.push_back(*i);
752 }
spirosmastorakis50642f82016-04-08 12:11:18 -0700753 }
754 }
755 }
756}
757
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700758shared_ptr<Interest>
759TorrentManager::createInterest(Name name)
760{
761 shared_ptr<Interest> interest = make_shared<Interest>(name);
762 interest->setInterestLifetime(time::milliseconds(2000));
763 interest->setMustBeFresh(true);
764
765 // Select routable prefix
766 Link link(name, { {1, m_stats_table_iter->getRecordName()} });
767 m_keyChain->sign(link, signingWithSha256());
768 Block linkWire = link.wireEncode();
769
770 // Stats Table update here...
771 m_stats_table_iter->incrementSentInterests();
772
773 m_sortingCounter++;
774 if (m_sortingCounter >= SORTING_INTERVAL) {
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700775 // Use the sorting interval to send out "ALIVE" Interests as well
776 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700777 // if (m_updateHandler->needsUpdate()) {
778 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
779 // }
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700780 // Do the actual sorting related stuff
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700781 m_sortingCounter = 0;
782 m_statsTable.sort();
783 m_stats_table_iter = m_statsTable.begin();
784 m_retries = 0;
785 }
786
787 interest->setLink(linkWire);
788
789 return interest;
790}
791
Mickey Sweatt527b0492016-03-02 11:07:48 -0800792} // end ntorrent
spirosmastorakisfd334462016-04-18 15:48:31 -0700793} // end ndn