blob: ad7f8428fd64b3467a0cb66b8ae3c671d3029901 [file] [log] [blame]
spirosmastorakisd351c6b2016-05-06 17:02:48 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3* Copyright (c) 2016 Regents of the University of California.
4*
5* This file is part of the nTorrent codebase.
6*
7* nTorrent is free software: you can redistribute it and/or modify it under the
8* terms of the GNU Lesser General Public License as published by the Free Software
9* Foundation, either version 3 of the License, or (at your option) any later version.
10*
11* nTorrent is distributed in the hope that it will be useful, but WITHOUT ANY
12* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14*
15* You should have received copies of the GNU General Public License and GNU Lesser
16* General Public License along with nTorrent, e.g., in COPYING.md file. If not, see
17* <http://www.gnu.org/licenses/>.
18*
19* See AUTHORS for complete list of nTorrent authors and contributors.
20*/
21
Mickey Sweatt527b0492016-03-02 11:07:48 -080022#include "torrent-manager.hpp"
23
24#include "file-manifest.hpp"
25#include "torrent-file.hpp"
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070026#include "util/io-util.hpp"
Mickey Sweatt617d2d42016-04-25 22:02:08 -070027#include "util/logging.hpp"
Mickey Sweatt527b0492016-03-02 11:07:48 -080028
29#include <boost/filesystem.hpp>
30#include <boost/filesystem/fstream.hpp>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/security/key-chain.hpp>
34#include <ndn-cxx/security/signing-helpers.hpp>
35#include <ndn-cxx/util/io.hpp>
36
37#include <set>
38#include <string>
39#include <unordered_map>
40#include <vector>
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -070041#include <boost/asio/io_service.hpp>
Mickey Sweatt527b0492016-03-02 11:07:48 -080042
43namespace fs = boost::filesystem;
44
45using std::string;
46using std::vector;
47
Mickey Sweatt527b0492016-03-02 11:07:48 -080048namespace ndn {
49namespace ntorrent {
50
Mickey Sweatt527b0492016-03-02 11:07:48 -080051static vector<TorrentFile>
52intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
53{
54 security::KeyChain key_chain;
55 Name currSegmentFullName = initialSegmentName;
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070056 vector<TorrentFile> torrentSegments = IoUtil::load_directory<TorrentFile>(torrentFilePath);
Mickey Sweatt527b0492016-03-02 11:07:48 -080057 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
58 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
59 TorrentFile& segment = *it;
60 key_chain.sign(segment, signingWithSha256());
61 if (segment.getFullName() != currSegmentFullName) {
62 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
63 torrentSegments.swap(correctSegments);
64 break;
65 }
66 // load the next full name
67 if (nullptr == segment.getTorrentFilePtr()) {
68 break;
69 }
70 currSegmentFullName = *segment.getTorrentFilePtr();
71 }
72 return torrentSegments;
73}
74
75static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -070076intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -080077{
78 security::KeyChain key_chain;
79
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070080 vector<FileManifest> manifests = IoUtil::load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -070081 if (manifests.empty()) {
82 return manifests;
83 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070084
Mickey Sweatt527b0492016-03-02 11:07:48 -080085 // sign the manifests
86 std::for_each(manifests.begin(), manifests.end(),
87 [&key_chain](FileManifest& m){
88 key_chain.sign(m,signingWithSha256());
89 });
90
Mickey Sweatte908a5c2016-04-08 14:10:45 -070091 // put all names of initial manifests from the valid torrent files into a set
92 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080093 for (const auto& segment : torrentSegments) {
94 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -070095 validInitialManifestNames.insert(validInitialManifestNames.end(),
96 catalog.begin(),
97 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -080098 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070099 auto manifest_it = manifests.begin();
100 std::vector<FileManifest> output;
101 output.reserve(manifests.size());
Mickey Sweatt527b0492016-03-02 11:07:48 -0800102
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700103 for (auto& initialName : validInitialManifestNames) {
104 // starting from the initial segment
105 auto& validName = initialName;
106 if (manifests.end() == manifest_it) {
107 break;
108 }
109 auto fileName = manifest_it->file_name();
110 // sequential collect all valid segments
111 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
112 output.push_back(*manifest_it);
113 if (manifest_it->submanifest_ptr() != nullptr) {
114 validName = *manifest_it->submanifest_ptr();
115 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800116 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700117 else {
118 ++manifest_it;
119 break;
120 }
121 }
122 // skip the remain segments for this file (all invalid)
123 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
124 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800125 }
126 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700127 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800128}
129
130static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700131initializeDataPackets(const string& filePath,
132 const FileManifest manifest,
133 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800134{
135 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700136 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800137
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700138 packets = IoUtil::packetize_file(filePath,
139 manifest.name(),
140 manifest.data_packet_size(),
141 subManifestSize,
142 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800143
144 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800145 // Filter out invalid packet names
146 std::remove_if(packets.begin(), packets.end(),
147 [&packets, &catalog](const Data& p) {
148 return catalog.end() == std::find(catalog.begin(),
149 catalog.end(),
150 p.getFullName());
151 });
152 return packets;
153}
154
Mickey Sweattafda1f12016-04-04 17:15:11 -0700155static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700156initializeFileState(const string& dataPath,
157 const FileManifest& manifest,
158 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700159{
160 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700161 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700162 auto filePath = dataPath + fileName;
163 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700164 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
165 if (!fs::exists(filePath)) {
166 fs::ofstream fs(filePath);
167 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700168 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700169 auto s = std::make_shared<fs::fstream>(filePath,
170 fs::fstream::out
171 | fs::fstream::binary
172 | fs::fstream::in);
173 if (!*s) {
174 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
175 }
176 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
177 s->seekg(start_offset);
178 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700179 return std::make_pair(s, fileBitMap);
180}
181
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700182//==================================================================================================
183// TorrentManager Implementation
184//==================================================================================================
185
Mickey Sweatt527b0492016-03-02 11:07:48 -0800186void TorrentManager::Initialize()
187{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700188 // initialize the update handler
189
190 // figure out the name of the torrent
191 Name torrentName;
spirosmastorakisd351c6b2016-05-06 17:02:48 -0700192 Name scheme(SharedConstants::commonPrefix);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700193 if (m_torrentFileName.get(m_torrentFileName.size() - 2).isSequenceNumber()) {
spirosmastorakisd351c6b2016-05-06 17:02:48 -0700194 torrentName = m_torrentFileName.getSubName(1 + scheme.size(), m_torrentFileName.size() - (4 + scheme.size()));
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700195 }
196 else {
spirosmastorakisd351c6b2016-05-06 17:02:48 -0700197 torrentName = m_torrentFileName.getSubName(1 + scheme.size(), m_torrentFileName.size() - (3 + scheme.size()));
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700198 }
199
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700200 // TODO(spyros) Get update manager working
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700201 // m_updateHandler = make_shared<UpdateHandler>(torrentName, m_keyChain,
202 // make_shared<StatsTable>(m_statsTable), m_face);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700203
Mickey Sweatt527b0492016-03-02 11:07:48 -0800204 // .../<torrent_name>/torrent-file/<implicit_digest>
205 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
206 string manifestPath = dataPath +"/manifests";
207 string torrentFilePath = dataPath +"/torrent_files";
208
209 // get the torrent file segments and manifests that we have.
210 if (!fs::exists(torrentFilePath)) {
211 return;
212 }
213 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
214 if (m_torrentSegments.empty()) {
215 return;
216 }
217 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700218
219 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800220 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700221 if (m.submanifest_number() == 0) {
222 auto manifestFileName = m.file_name();
223 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800224 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700225 }
226
227 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800228 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700229 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700230 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800231 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700232 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700233 if (!fs::exists(filePath.parent_path())) {
234 boost::filesystem::create_directories(filePath.parent_path());
235 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700236 continue;
237 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700238 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800239 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700240 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
241 m,
242 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700243 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800244 auto read_it = packets.begin();
245 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700246 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700247 if (read_it == packets.end()) {
248 break;
249 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800250 if (name == read_it->getFullName()) {
251 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700252 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800253 }
254 ++i;
255 }
256 for (const auto& d : packets) {
257 seed(d);
258 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800259 }
260 }
261 for (const auto& t : m_torrentSegments) {
262 seed(t);
263 }
264 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700265 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800266 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700267}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800268
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700269shared_ptr<Name>
270TorrentManager::findTorrentFileSegmentToDownload() const
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700271{
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700272 // if we have no segments
273 if (m_torrentSegments.empty()) {
274 return make_shared<Name>(m_torrentFileName);
275 }
276 // otherwise just return the next segment ptr of the last segment we have
277 return m_torrentSegments.back().getTorrentFilePtr();
278}
279
280shared_ptr<Name>
281TorrentManager::findManifestSegmentToDownload(const Name& manifestName) const
282{
283 //sequentially find whether we have downloaded any segments of this manifest file
284 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
285 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
286 [&manifestPrefix] (const FileManifest& f) {
287 return manifestPrefix.isPrefixOf(f.getName());
288 });
289
290 // if we do not have any segments of the file manifest
291 if (it == m_fileManifests.rend()) {
292 return make_shared<Name>(manifestName);
293 }
294
295 // if we already have the requested segment of the file manifest
296 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
297 return it->submanifest_ptr();
298 }
299 // if we do not have the requested segment
300 else {
301 return make_shared<Name>(manifestName);
302 }
303}
304
305void
306TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames) const
307{
308 std::vector<Name> manifests;
309 // insert the first segment name of all the file manifests to the vector
310 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
311 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
312 }
313 // for each file
314 for (const auto& manifestName : manifests) {
315 // find the first (if any) segment we are missing
316 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
317 if (nullptr != manifestSegmentName) {
318 manifestNames.push_back(*manifestSegmentName);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700319 }
320 }
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700321}
322
323bool
324TorrentManager::hasDataPacket(const Name& dataName) const
325{
326
327 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
328 [&dataName](const FileManifest& m) {
329 return m.getName().isPrefixOf(dataName);
330 });
331
332 // if we do not have the file manifest, just return false
333 if (manifest_it == m_fileManifests.end()) {
334 return false;
335 }
336
337 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
338 // that corresponds to the specific submanifest
339 auto fileState_it = m_fileStates.find(manifest_it->getFullName());
340 if (m_fileStates.end() != fileState_it) {
341 const auto& fileState = fileState_it->second;
342 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
343 // find whether we have the requested packet from the bitmap
344 return fileState.second[dataNum];
345 }
346 return false;
347}
348
349void
350TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames) const
351{
352 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
353 [&manifestName](const FileManifest& m) {
354 return m.name().getSubName(0, m.name().size()
355 - 1).isPrefixOf(manifestName);
356 });
357
358 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
359 auto& fileState = m_fileStates[j->getFullName()];
360 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
361 if (!fileState.second[dataNum]) {
362 packetNames.push_back(j->catalog()[dataNum]);
363 }
364 }
365
366 // check that the next manifest in the vector refers to the next segment of the same file
367 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
368 break;
369 }
370 }
371}
372
373void
374TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames) const
375{
376 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
377 auto fileState_it = m_fileStates.find(j->getFullName());
378 // if we have no packets from this file
379 if (m_fileStates.end() == fileState_it) {
380 packetNames.reserve(packetNames.size() + j->catalog().size());
381 packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
382 }
383 // find the packets that we are missing
384 else {
385 const auto &fileState = fileState_it->second;
386 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
387 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
388 if (!fileState.second[dataNum]) {
389 packetNames.push_back(*i);
390 }
391 }
392 }
393 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700394}
395
396void
397TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
398 const std::string& path,
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700399 TorrentFileReceivedCallback onSuccess,
400 FailedCallback onFailed)
401{
402 shared_ptr<Interest> interest = createInterest(name);
403
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700404 auto dataReceived = [path, onSuccess, onFailed, this]
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700405 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700406 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700407 // Stats Table update here...
408 m_stats_table_iter->incrementReceivedData();
409 m_retries = 0;
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700410 std::vector<Name> manifestNames;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700411 TorrentFile file(data.wireEncode());
412
413 // Write the torrent file segment to disk...
414 if (writeTorrentSegment(file, path)) {
415 // if successfully written, seed this data
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700416 seed(file);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700417 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700418 const std::vector<Name>& manifestCatalog = file.getCatalog();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700419 manifestNames.insert(manifestNames.end(), manifestCatalog.begin(), manifestCatalog.end());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700420
421 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700422 if (onSuccess) {
423 onSuccess(manifestNames);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700424 }
425 if (nextSegmentPtr != nullptr) {
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700426 this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700427 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700428 this->sendInterest();
429 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700430 shutdown();
431 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700432 };
433
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700434 auto dataFailed = [path, name, onSuccess, onFailed, this]
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700435 (const Interest& interest) {
spirosmastorakis9b68b532016-05-02 21:40:29 -0700436 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700437 ++m_retries;
438 if (m_retries >= MAX_NUM_OF_RETRIES) {
439 ++m_stats_table_iter;
440 if (m_stats_table_iter == m_statsTable.end()) {
441 m_stats_table_iter = m_statsTable.begin();
442 }
443 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700444 this->sendInterest();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700445 if (onFailed) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700446 onFailed(interest.getName(), "Unknown error");
447 }
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700448 this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700449 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700450 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
451 m_interestQueue->push(interest, dataReceived, dataFailed);
452 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700453}
454
455void
456TorrentManager::downloadTorrentFile(const std::string& path,
457 TorrentFileReceivedCallback onSuccess,
458 FailedCallback onFailed)
459{
460 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
461 auto manifestNames = make_shared<std::vector<Name>>();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700462 if (searchRes != nullptr) {
463 this->downloadTorrentFileSegment(*searchRes, path, onSuccess, onFailed);
464 }
465 else {
466 std::vector<Name> manifests;
467 findFileManifestsToDownload(manifests);
468 if (onSuccess) {
469 onSuccess(manifests);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700470 }
471 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700472}
473
474void
475TorrentManager::download_file_manifest(const Name& manifestName,
476 const std::string& path,
477 TorrentManager::ManifestReceivedCallback onSuccess,
478 TorrentManager::FailedCallback onFailed)
479{
480 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
481 auto packetNames = make_shared<std::vector<Name>>();
482 if (searchRes == nullptr) {
483 this->findDataPacketsToDownload(manifestName, *packetNames);
484 onSuccess(*packetNames);
485 return;
486 }
487 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
488}
489
490void
491TorrentManager::download_data_packet(const Name& packetName,
492 DataReceivedCallback onSuccess,
493 FailedCallback onFailed)
494{
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700495 if (this->hasDataPacket(packetName)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700496 onSuccess(packetName);
497 return;
498 }
499
500 shared_ptr<Interest> interest = this->createInterest(packetName);
501
502 auto dataReceived = [onSuccess, onFailed, this]
503 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700504 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700505 // Write data to disk...
506 if(writeData(data)) {
507 seed(data);
508 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700509 // Stats Table update here...
510 m_stats_table_iter->incrementReceivedData();
511 m_retries = 0;
512 onSuccess(data.getName());
spirosmastorakis9b68b532016-05-02 21:40:29 -0700513 this->sendInterest();
514 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700515 shutdown();
516 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700517 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700518
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700519 auto dataFailed = [onFailed, this]
520 (const Interest& interest) {
521 m_retries++;
spirosmastorakis9b68b532016-05-02 21:40:29 -0700522 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700523 if (m_retries >= MAX_NUM_OF_RETRIES) {
524 m_stats_table_iter++;
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700525 if (m_stats_table_iter == m_statsTable.end()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700526 m_stats_table_iter = m_statsTable.begin();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700527 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700528 }
529 onFailed(interest.getName(), "Unknown failure");
spirosmastorakis9b68b532016-05-02 21:40:29 -0700530 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700531 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700532 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
533 m_interestQueue->push(interest, dataReceived, dataFailed);
534 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700535}
536
537void TorrentManager::seed(const Data& data) {
538 m_face->setInterestFilter(data.getFullName(),
539 bind(&TorrentManager::onInterestReceived, this, _1, _2),
540 RegisterPrefixSuccessCallback(),
541 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
542}
543
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -0700544void
545TorrentManager::shutdown()
546{
547 m_face->getIoService().stop();
548}
549
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700550// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
551// Protected Helpers
552// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
553
Mickey Sweattafda1f12016-04-04 17:15:11 -0700554bool TorrentManager::writeData(const Data& packet)
555{
556 // find correct manifest
557 const auto& packetName = packet.getName();
558 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
559 [&packetName](const FileManifest& m) {
560 return m.getName().isPrefixOf(packetName);
561 });
562 if (m_fileManifests.end() == manifest_it) {
563 return false;
564 }
565 // get file state out
566 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700567
Mickey Sweattafda1f12016-04-04 17:15:11 -0700568 // if there is no open stream to the file
569 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700570 fs::path filePath = m_dataPath + manifest_it->file_name();
571 if (!fs::exists(filePath)) {
572 fs::create_directories(filePath.parent_path());
573 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700574 fileState = initializeFileState(m_dataPath,
575 *manifest_it,
576 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700577 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700578 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700579 // if we already have the packet, do not rewrite it.
580 if (fileState.second[packetNum]) {
581 return false;
582 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700583 // write data to disk
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700584 // TODO(msweatt) Fix this once code is merged
585 auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
586 if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700587 fileState.first->flush();
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700588 // update bitmap
589 fileState.second[packetNum] = true;
590 return true;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700591 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700592 return false;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800593}
594
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700595bool
596TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700597{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700598 // validate the torrent
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700599 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700600 // check if we already have it
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700601 if (torrentPrefix.isPrefixOf(segment.getName()) &&
602 m_torrentSegments.end() == std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700603 segment))
604 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700605 if(IoUtil::writeTorrentSegment(segment, path)) {
606 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
607 [&segment](const TorrentFile& t){
608 return segment.getSegmentNumber() < t.getSegmentNumber() ;
609 });
610 m_torrentSegments.insert(it, segment);
611 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700612 }
613 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700614 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700615}
616
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700617
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700618bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
619{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700620 if (m_fileManifests.end() == std::find(m_fileManifests.begin(), m_fileManifests.end(),
621 manifest))
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700622 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700623 // update the state of the manager
624 if (0 == manifest.submanifest_number()) {
625 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
626 }
627 if(IoUtil::writeFileManifest(manifest, path)) {
628 // add to collection
629 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
630 [&manifest](const FileManifest& m){
631 return m.file_name() > manifest.file_name()
632 || (m.file_name() == manifest.file_name()
633 && (m.submanifest_number() > manifest.submanifest_number()));
634 });
635 m_fileManifests.insert(it, manifest);
636 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700637 }
638 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700639 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700640}
641
spirosmastorakisa46eee42016-04-05 14:24:45 -0700642void
643TorrentManager::downloadFileManifestSegment(const Name& manifestName,
644 const std::string& path,
645 std::shared_ptr<std::vector<Name>> packetNames,
646 TorrentManager::ManifestReceivedCallback onSuccess,
647 TorrentManager::FailedCallback onFailed)
648{
649 shared_ptr<Interest> interest = this->createInterest(manifestName);
650
651 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
652 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700653 m_pendingInterests.erase(interest.getName());
spirosmastorakisa46eee42016-04-05 14:24:45 -0700654 // Stats Table update here...
655 m_stats_table_iter->incrementReceivedData();
656 m_retries = 0;
657
658 FileManifest file(data.wireEncode());
659
660 // Write the file manifest segment to disk...
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700661 if(writeFileManifest(file, path)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700662 seed(file);
663 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700664 else {
665 onFailed(interest.getName(), "Write Failed");
666 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700667
668 const std::vector<Name>& packetsCatalog = file.catalog();
669 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
670 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
671 if (nextSegmentPtr != nullptr) {
672 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
673 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700674 else {
spirosmastorakisa46eee42016-04-05 14:24:45 -0700675 onSuccess(*packetNames);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700676 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700677 this->sendInterest();
678 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700679 shutdown();
680 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700681 };
682
683 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
684 (const Interest& interest) {
spirosmastorakis9b68b532016-05-02 21:40:29 -0700685 m_pendingInterests.erase(interest.getName());
spirosmastorakisa46eee42016-04-05 14:24:45 -0700686 m_retries++;
687 if (m_retries >= MAX_NUM_OF_RETRIES) {
688 m_stats_table_iter++;
689 if (m_stats_table_iter == m_statsTable.end())
690 m_stats_table_iter = m_statsTable.begin();
691 }
692 onFailed(interest.getName(), "Unknown failure");
spirosmastorakis9b68b532016-05-02 21:40:29 -0700693 this->sendInterest();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700694 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700695 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
696 m_interestQueue->push(interest, dataReceived, dataFailed);
697 this->sendInterest();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700698}
699
700void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700701TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700702{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700703 // handle if it is a torrent-file
spirosmastorakis9b68b532016-05-02 21:40:29 -0700704 LOG_DEBUG << "Interest Received: " << interest << std::endl;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700705 const auto& interestName = interest.getName();
706 std::shared_ptr<Data> data = nullptr;
707 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700708 // determine if it is torrent file (that we have)
709 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
710 if (m_torrentSegments.end() != torrent_it) {
711 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700712 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700713 else {
714 // determine if it is manifest (that we have)
715 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
716 if (m_fileManifests.end() != manifest_it) {
717 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700718 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700719 else {
720 // determine if it is data packet (that we have)
721 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
722 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
723 [&manifestName](const std::pair<Name,
724 std::pair<std::shared_ptr<fs::fstream>,
725 std::vector<bool>>>& kv){
726 return manifestName.isPrefixOf(kv.first);
727 });
728 if (m_fileStates.end() != map_it) {
729 auto packetName = interestName.getSubName(0, interestName.size() - 1);
730 // get out the bitmap to be sure we have the packet
731 auto& fileState = map_it->second;
732 const auto &bitmap = fileState.second;
733 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
734 if (bitmap[packetNum]) {
735 // get the manifest
736 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
737 [&manifestName](const FileManifest& m) {
738 return manifestName.isPrefixOf(m.name());
739 });
740 auto manifestFileName = manifest_it->file_name();
741 auto filePath = m_dataPath + manifestFileName;
742 // TODO(msweatt) Explore why fileState stream does not work
743 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700744 data = IoUtil::readDataPacket(interestName,
745 *manifest_it,
746 m_subManifestSizes[manifestFileName],
747 is);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700748 }
749 }
750 }
751 }
752 if (nullptr != data) {
753 m_face->put(*data);
754 }
755 else {
756 // TODO(msweatt) NACK
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700757 LOG_ERROR << "NACK: " << interest << std::endl;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700758 }
759 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700760}
761
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700762void
763TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700764{
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700765 LOG_ERROR << "ERROR: Failed to register prefix \""
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700766 << prefix << "\" in local hub's daemon (" << reason << ")"
767 << std::endl;
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700768 shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700769}
770
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700771shared_ptr<Interest>
772TorrentManager::createInterest(Name name)
773{
774 shared_ptr<Interest> interest = make_shared<Interest>(name);
775 interest->setInterestLifetime(time::milliseconds(2000));
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700776
777 // Select routable prefix
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700778 // TODO(spyros) Fix links
779 // Link link(name, { {1, m_stats_table_iter->getRecordName()} });
780 // m_keyChain->sign(link, signingWithSha256());
781 // Block linkWire = link.wireEncode();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700782
783 // Stats Table update here...
784 m_stats_table_iter->incrementSentInterests();
785
786 m_sortingCounter++;
787 if (m_sortingCounter >= SORTING_INTERVAL) {
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700788 // Use the sorting interval to send out "ALIVE" Interests as well
789 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700790 // if (m_updateHandler->needsUpdate()) {
791 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
792 // }
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700793 // Do the actual sorting related stuff
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700794 m_sortingCounter = 0;
795 m_statsTable.sort();
796 m_stats_table_iter = m_statsTable.begin();
797 m_retries = 0;
798 }
799
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700800 // interest->setLink(linkWire);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700801
802 return interest;
803}
804
spirosmastorakis9b68b532016-05-02 21:40:29 -0700805void
806TorrentManager::sendInterest()
807{
808 auto nackCallBack = [](const Interest& i, const lp::Nack& n) {
809 LOG_ERROR << "Nack received: " << n.getReason() << ": " << i << std::endl;
810 };
811 while (m_pendingInterests.size() < WINDOW_SIZE && !m_interestQueue->empty()) {
812 queueTuple tup = m_interestQueue->pop();
813 m_pendingInterests.insert(std::get<0>(tup)->getName());
814 LOG_DEBUG << "Sending: " << *(std::get<0>(tup)) << std::endl;
815 m_face->expressInterest(*std::get<0>(tup), std::get<1>(tup), nackCallBack, std::get<2>(tup));
816 }
817}
818
Mickey Sweatt527b0492016-03-02 11:07:48 -0800819} // end ntorrent
spirosmastorakisfd334462016-04-18 15:48:31 -0700820} // end ndn