blob: 892228f4515e40748a7bce6d88428bfdae703e50 [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -07005#include "util/io-util.hpp"
Mickey Sweatt617d2d42016-04-25 22:02:08 -07006#include "util/logging.hpp"
Mickey Sweatt527b0492016-03-02 11:07:48 -08007
8#include <boost/filesystem.hpp>
9#include <boost/filesystem/fstream.hpp>
10
11#include <ndn-cxx/data.hpp>
12#include <ndn-cxx/security/key-chain.hpp>
13#include <ndn-cxx/security/signing-helpers.hpp>
14#include <ndn-cxx/util/io.hpp>
15
16#include <set>
17#include <string>
18#include <unordered_map>
19#include <vector>
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -070020#include <boost/asio/io_service.hpp>
Mickey Sweatt527b0492016-03-02 11:07:48 -080021
22namespace fs = boost::filesystem;
23
24using std::string;
25using std::vector;
26
Mickey Sweatt527b0492016-03-02 11:07:48 -080027namespace ndn {
28namespace ntorrent {
29
Mickey Sweatt527b0492016-03-02 11:07:48 -080030static vector<TorrentFile>
31intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
32{
33 security::KeyChain key_chain;
34 Name currSegmentFullName = initialSegmentName;
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070035 vector<TorrentFile> torrentSegments = IoUtil::load_directory<TorrentFile>(torrentFilePath);
Mickey Sweatt527b0492016-03-02 11:07:48 -080036 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
37 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
38 TorrentFile& segment = *it;
39 key_chain.sign(segment, signingWithSha256());
40 if (segment.getFullName() != currSegmentFullName) {
41 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
42 torrentSegments.swap(correctSegments);
43 break;
44 }
45 // load the next full name
46 if (nullptr == segment.getTorrentFilePtr()) {
47 break;
48 }
49 currSegmentFullName = *segment.getTorrentFilePtr();
50 }
51 return torrentSegments;
52}
53
54static vector<FileManifest>
Mickey Sweatte908a5c2016-04-08 14:10:45 -070055intializeFileManifests(const string& manifestPath, const vector<TorrentFile>& torrentSegments)
Mickey Sweatt527b0492016-03-02 11:07:48 -080056{
57 security::KeyChain key_chain;
58
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070059 vector<FileManifest> manifests = IoUtil::load_directory<FileManifest>(manifestPath);
Mickey Sweatte908a5c2016-04-08 14:10:45 -070060 if (manifests.empty()) {
61 return manifests;
62 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -070063
Mickey Sweatt527b0492016-03-02 11:07:48 -080064 // sign the manifests
65 std::for_each(manifests.begin(), manifests.end(),
66 [&key_chain](FileManifest& m){
67 key_chain.sign(m,signingWithSha256());
68 });
69
Mickey Sweatte908a5c2016-04-08 14:10:45 -070070 // put all names of initial manifests from the valid torrent files into a set
71 std::vector<ndn::Name> validInitialManifestNames;
Mickey Sweatt527b0492016-03-02 11:07:48 -080072 for (const auto& segment : torrentSegments) {
73 const auto& catalog = segment.getCatalog();
Mickey Sweatte908a5c2016-04-08 14:10:45 -070074 validInitialManifestNames.insert(validInitialManifestNames.end(),
75 catalog.begin(),
76 catalog.end());
Mickey Sweatt527b0492016-03-02 11:07:48 -080077 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070078 auto manifest_it = manifests.begin();
79 std::vector<FileManifest> output;
80 output.reserve(manifests.size());
Mickey Sweatt527b0492016-03-02 11:07:48 -080081
Mickey Sweatte908a5c2016-04-08 14:10:45 -070082 for (auto& initialName : validInitialManifestNames) {
83 // starting from the initial segment
84 auto& validName = initialName;
85 if (manifests.end() == manifest_it) {
86 break;
87 }
88 auto fileName = manifest_it->file_name();
89 // sequential collect all valid segments
90 while (manifest_it != manifests.end() && manifest_it->getFullName() == validName) {
91 output.push_back(*manifest_it);
92 if (manifest_it->submanifest_ptr() != nullptr) {
93 validName = *manifest_it->submanifest_ptr();
94 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -080095 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -070096 else {
97 ++manifest_it;
98 break;
99 }
100 }
101 // skip the remain segments for this file (all invalid)
102 while (manifests.end() != manifest_it && manifest_it->file_name() == fileName) {
103 ++manifest_it;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800104 }
105 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700106 return output;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800107}
108
109static vector<Data>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700110initializeDataPackets(const string& filePath,
111 const FileManifest manifest,
112 size_t subManifestSize)
Mickey Sweatt527b0492016-03-02 11:07:48 -0800113{
114 vector<Data> packets;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700115 auto subManifestNum = manifest.submanifest_number();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800116
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700117 packets = IoUtil::packetize_file(filePath,
118 manifest.name(),
119 manifest.data_packet_size(),
120 subManifestSize,
121 subManifestNum);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800122
123 auto catalog = manifest.catalog();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800124 // Filter out invalid packet names
125 std::remove_if(packets.begin(), packets.end(),
126 [&packets, &catalog](const Data& p) {
127 return catalog.end() == std::find(catalog.begin(),
128 catalog.end(),
129 p.getFullName());
130 });
131 return packets;
132}
133
Mickey Sweattafda1f12016-04-04 17:15:11 -0700134static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700135initializeFileState(const string& dataPath,
136 const FileManifest& manifest,
137 size_t subManifestSize)
Mickey Sweattafda1f12016-04-04 17:15:11 -0700138{
139 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700140 auto fileName = manifest.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700141 auto filePath = dataPath + fileName;
142 vector<bool> fileBitMap(manifest.catalog().size());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700143 // if the file does not exist, create an empty placeholder (otherwise cannot set read-bit)
144 if (!fs::exists(filePath)) {
145 fs::ofstream fs(filePath);
146 fs << "";
Mickey Sweattafda1f12016-04-04 17:15:11 -0700147 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700148 auto s = std::make_shared<fs::fstream>(filePath,
149 fs::fstream::out
150 | fs::fstream::binary
151 | fs::fstream::in);
152 if (!*s) {
153 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + filePath));
154 }
155 auto start_offset = manifest.submanifest_number() * subManifestSize * manifest.data_packet_size();
156 s->seekg(start_offset);
157 s->seekp(start_offset);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700158 return std::make_pair(s, fileBitMap);
159}
160
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700161//==================================================================================================
162// TorrentManager Implementation
163//==================================================================================================
164
Mickey Sweatt527b0492016-03-02 11:07:48 -0800165void TorrentManager::Initialize()
166{
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700167 // initialize the update handler
168
169 // figure out the name of the torrent
170 Name torrentName;
171 if (m_torrentFileName.get(m_torrentFileName.size() - 2).isSequenceNumber()) {
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -0700172 torrentName = m_torrentFileName.getSubName(3, m_torrentFileName.size() - 6);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700173 }
174 else {
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -0700175 torrentName = m_torrentFileName.getSubName(3, m_torrentFileName.size() - 5);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700176 }
177
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700178 // TODO(spyros) Get update manager working
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700179 // m_updateHandler = make_shared<UpdateHandler>(torrentName, m_keyChain,
180 // make_shared<StatsTable>(m_statsTable), m_face);
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700181
Mickey Sweatt527b0492016-03-02 11:07:48 -0800182 // .../<torrent_name>/torrent-file/<implicit_digest>
183 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
184 string manifestPath = dataPath +"/manifests";
185 string torrentFilePath = dataPath +"/torrent_files";
186
187 // get the torrent file segments and manifests that we have.
188 if (!fs::exists(torrentFilePath)) {
189 return;
190 }
191 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
192 if (m_torrentSegments.empty()) {
193 return;
194 }
195 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700196
197 // get the submanifest sizes
Mickey Sweatt527b0492016-03-02 11:07:48 -0800198 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700199 if (m.submanifest_number() == 0) {
200 auto manifestFileName = m.file_name();
201 m_subManifestSizes[manifestFileName] = m.catalog().size();
Mickey Sweatt527b0492016-03-02 11:07:48 -0800202 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700203 }
204
205 for (const auto& m : m_fileManifests) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800206 // construct the file name
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700207 auto fileName = m.file_name();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700208 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800209 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700210 if (!fs::exists(filePath)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700211 if (!fs::exists(filePath.parent_path())) {
212 boost::filesystem::create_directories(filePath.parent_path());
213 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700214 continue;
215 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700216 auto packets = initializeDataPackets(filePath.string(), m, m_subManifestSizes[m.file_name()]);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800217 if (!packets.empty()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700218 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath,
219 m,
220 m_subManifestSizes[m.file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700221 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800222 auto read_it = packets.begin();
223 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700224 for (auto name : m.catalog()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700225 if (read_it == packets.end()) {
226 break;
227 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800228 if (name == read_it->getFullName()) {
229 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700230 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800231 }
232 ++i;
233 }
234 for (const auto& d : packets) {
235 seed(d);
236 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800237 }
238 }
239 for (const auto& t : m_torrentSegments) {
240 seed(t);
241 }
242 for (const auto& m : m_fileManifests) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700243 seed(m);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800244 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700245}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800246
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700247shared_ptr<Name>
248TorrentManager::findTorrentFileSegmentToDownload() const
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700249{
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700250 // if we have no segments
251 if (m_torrentSegments.empty()) {
252 return make_shared<Name>(m_torrentFileName);
253 }
254 // otherwise just return the next segment ptr of the last segment we have
255 return m_torrentSegments.back().getTorrentFilePtr();
256}
257
258shared_ptr<Name>
259TorrentManager::findManifestSegmentToDownload(const Name& manifestName) const
260{
261 //sequentially find whether we have downloaded any segments of this manifest file
262 Name manifestPrefix = manifestName.getSubName(0, manifestName.size() - 2);
263 auto it = std::find_if(m_fileManifests.rbegin(), m_fileManifests.rend(),
264 [&manifestPrefix] (const FileManifest& f) {
265 return manifestPrefix.isPrefixOf(f.getName());
266 });
267
268 // if we do not have any segments of the file manifest
269 if (it == m_fileManifests.rend()) {
270 return make_shared<Name>(manifestName);
271 }
272
273 // if we already have the requested segment of the file manifest
274 if (it->submanifest_number() >= manifestName.get(manifestName.size() - 2).toSequenceNumber()) {
275 return it->submanifest_ptr();
276 }
277 // if we do not have the requested segment
278 else {
279 return make_shared<Name>(manifestName);
280 }
281}
282
283void
284TorrentManager::findFileManifestsToDownload(std::vector<Name>& manifestNames) const
285{
286 std::vector<Name> manifests;
287 // insert the first segment name of all the file manifests to the vector
288 for (auto i = m_torrentSegments.begin(); i != m_torrentSegments.end(); i++) {
289 manifests.insert(manifests.end(), i->getCatalog().begin(), i->getCatalog().end());
290 }
291 // for each file
292 for (const auto& manifestName : manifests) {
293 // find the first (if any) segment we are missing
294 shared_ptr<Name> manifestSegmentName = findManifestSegmentToDownload(manifestName);
295 if (nullptr != manifestSegmentName) {
296 manifestNames.push_back(*manifestSegmentName);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700297 }
298 }
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700299}
300
301bool
302TorrentManager::hasDataPacket(const Name& dataName) const
303{
304
305 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
306 [&dataName](const FileManifest& m) {
307 return m.getName().isPrefixOf(dataName);
308 });
309
310 // if we do not have the file manifest, just return false
311 if (manifest_it == m_fileManifests.end()) {
312 return false;
313 }
314
315 // find the pair of (std::shared_ptr<fs::fstream>, std::vector<bool>)
316 // that corresponds to the specific submanifest
317 auto fileState_it = m_fileStates.find(manifest_it->getFullName());
318 if (m_fileStates.end() != fileState_it) {
319 const auto& fileState = fileState_it->second;
320 auto dataNum = dataName.get(dataName.size() - 2).toSequenceNumber();
321 // find whether we have the requested packet from the bitmap
322 return fileState.second[dataNum];
323 }
324 return false;
325}
326
327void
328TorrentManager::findDataPacketsToDownload(const Name& manifestName, std::vector<Name>& packetNames) const
329{
330 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
331 [&manifestName](const FileManifest& m) {
332 return m.name().getSubName(0, m.name().size()
333 - 1).isPrefixOf(manifestName);
334 });
335
336 for (auto j = manifest_it; j != m_fileManifests.end(); j++) {
337 auto& fileState = m_fileStates[j->getFullName()];
338 for (size_t dataNum = 0; dataNum < j->catalog().size(); ++dataNum) {
339 if (!fileState.second[dataNum]) {
340 packetNames.push_back(j->catalog()[dataNum]);
341 }
342 }
343
344 // check that the next manifest in the vector refers to the next segment of the same file
345 if ((j + 1) != m_fileManifests.end() && (j+1)->file_name() != manifest_it->file_name()) {
346 break;
347 }
348 }
349}
350
351void
352TorrentManager::findAllMissingDataPackets(std::vector<Name>& packetNames) const
353{
354 for (auto j = m_fileManifests.begin(); j != m_fileManifests.end(); ++j) {
355 auto fileState_it = m_fileStates.find(j->getFullName());
356 // if we have no packets from this file
357 if (m_fileStates.end() == fileState_it) {
358 packetNames.reserve(packetNames.size() + j->catalog().size());
359 packetNames.insert(packetNames.end(), j->catalog().begin(), j->catalog().end());
360 }
361 // find the packets that we are missing
362 else {
363 const auto &fileState = fileState_it->second;
364 for (auto i = j->catalog().begin(); i != j->catalog().end(); i++) {
365 auto dataNum = i->get(i->size() - 2).toSequenceNumber();
366 if (!fileState.second[dataNum]) {
367 packetNames.push_back(*i);
368 }
369 }
370 }
371 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700372}
373
374void
375TorrentManager::downloadTorrentFileSegment(const ndn::Name& name,
376 const std::string& path,
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700377 TorrentFileReceivedCallback onSuccess,
378 FailedCallback onFailed)
379{
380 shared_ptr<Interest> interest = createInterest(name);
381
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700382 auto dataReceived = [path, onSuccess, onFailed, this]
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700383 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700384 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700385 // Stats Table update here...
386 m_stats_table_iter->incrementReceivedData();
387 m_retries = 0;
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700388 std::vector<Name> manifestNames;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700389 TorrentFile file(data.wireEncode());
390
391 // Write the torrent file segment to disk...
392 if (writeTorrentSegment(file, path)) {
393 // if successfully written, seed this data
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700394 seed(file);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700395 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700396 const std::vector<Name>& manifestCatalog = file.getCatalog();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700397 manifestNames.insert(manifestNames.end(), manifestCatalog.begin(), manifestCatalog.end());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700398
399 shared_ptr<Name> nextSegmentPtr = file.getTorrentFilePtr();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700400 if (onSuccess) {
401 onSuccess(manifestNames);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700402 }
403 if (nextSegmentPtr != nullptr) {
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700404 this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700405 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700406 this->sendInterest();
407 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700408 shutdown();
409 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700410 };
411
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700412 auto dataFailed = [path, name, onSuccess, onFailed, this]
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700413 (const Interest& interest) {
spirosmastorakis9b68b532016-05-02 21:40:29 -0700414 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700415 ++m_retries;
416 if (m_retries >= MAX_NUM_OF_RETRIES) {
417 ++m_stats_table_iter;
418 if (m_stats_table_iter == m_statsTable.end()) {
419 m_stats_table_iter = m_statsTable.begin();
420 }
421 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700422 this->sendInterest();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700423 if (onFailed) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700424 onFailed(interest.getName(), "Unknown error");
425 }
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700426 this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700427 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700428 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
429 m_interestQueue->push(interest, dataReceived, dataFailed);
430 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700431}
432
433void
434TorrentManager::downloadTorrentFile(const std::string& path,
435 TorrentFileReceivedCallback onSuccess,
436 FailedCallback onFailed)
437{
438 shared_ptr<Name> searchRes = this->findTorrentFileSegmentToDownload();
439 auto manifestNames = make_shared<std::vector<Name>>();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700440 if (searchRes != nullptr) {
441 this->downloadTorrentFileSegment(*searchRes, path, onSuccess, onFailed);
442 }
443 else {
444 std::vector<Name> manifests;
445 findFileManifestsToDownload(manifests);
446 if (onSuccess) {
447 onSuccess(manifests);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700448 }
449 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700450}
451
452void
453TorrentManager::download_file_manifest(const Name& manifestName,
454 const std::string& path,
455 TorrentManager::ManifestReceivedCallback onSuccess,
456 TorrentManager::FailedCallback onFailed)
457{
458 shared_ptr<Name> searchRes = findManifestSegmentToDownload(manifestName);
459 auto packetNames = make_shared<std::vector<Name>>();
460 if (searchRes == nullptr) {
461 this->findDataPacketsToDownload(manifestName, *packetNames);
462 onSuccess(*packetNames);
463 return;
464 }
465 this->downloadFileManifestSegment(*searchRes, path, packetNames, onSuccess, onFailed);
466}
467
468void
469TorrentManager::download_data_packet(const Name& packetName,
470 DataReceivedCallback onSuccess,
471 FailedCallback onFailed)
472{
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700473 if (this->hasDataPacket(packetName)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700474 onSuccess(packetName);
475 return;
476 }
477
478 shared_ptr<Interest> interest = this->createInterest(packetName);
479
480 auto dataReceived = [onSuccess, onFailed, this]
481 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700482 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700483 // Write data to disk...
484 if(writeData(data)) {
485 seed(data);
486 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700487 // Stats Table update here...
488 m_stats_table_iter->incrementReceivedData();
489 m_retries = 0;
490 onSuccess(data.getName());
spirosmastorakis9b68b532016-05-02 21:40:29 -0700491 this->sendInterest();
492 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700493 shutdown();
494 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700495 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700496
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700497 auto dataFailed = [onFailed, this]
498 (const Interest& interest) {
499 m_retries++;
spirosmastorakis9b68b532016-05-02 21:40:29 -0700500 m_pendingInterests.erase(interest.getName());
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700501 if (m_retries >= MAX_NUM_OF_RETRIES) {
502 m_stats_table_iter++;
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700503 if (m_stats_table_iter == m_statsTable.end()) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700504 m_stats_table_iter = m_statsTable.begin();
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700505 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700506 }
507 onFailed(interest.getName(), "Unknown failure");
spirosmastorakis9b68b532016-05-02 21:40:29 -0700508 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700509 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700510 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
511 m_interestQueue->push(interest, dataReceived, dataFailed);
512 this->sendInterest();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700513}
514
515void TorrentManager::seed(const Data& data) {
516 m_face->setInterestFilter(data.getFullName(),
517 bind(&TorrentManager::onInterestReceived, this, _1, _2),
518 RegisterPrefixSuccessCallback(),
519 bind(&TorrentManager::onRegisterFailed, this, _1, _2));
520}
521
Mickey Sweatt0dc0a1e2016-05-04 11:25:49 -0700522void
523TorrentManager::shutdown()
524{
525 m_face->getIoService().stop();
526}
527
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700528// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
529// Protected Helpers
530// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
531
Mickey Sweattafda1f12016-04-04 17:15:11 -0700532bool TorrentManager::writeData(const Data& packet)
533{
534 // find correct manifest
535 const auto& packetName = packet.getName();
536 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
537 [&packetName](const FileManifest& m) {
538 return m.getName().isPrefixOf(packetName);
539 });
540 if (m_fileManifests.end() == manifest_it) {
541 return false;
542 }
543 // get file state out
544 auto& fileState = m_fileStates[manifest_it->getFullName()];
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700545
Mickey Sweattafda1f12016-04-04 17:15:11 -0700546 // if there is no open stream to the file
547 if (nullptr == fileState.first) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700548 fs::path filePath = m_dataPath + manifest_it->file_name();
549 if (!fs::exists(filePath)) {
550 fs::create_directories(filePath.parent_path());
551 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700552 fileState = initializeFileState(m_dataPath,
553 *manifest_it,
554 m_subManifestSizes[manifest_it->file_name()]);
Mickey Sweattafda1f12016-04-04 17:15:11 -0700555 }
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700556 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700557 // if we already have the packet, do not rewrite it.
558 if (fileState.second[packetNum]) {
559 return false;
560 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700561 // write data to disk
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700562 // TODO(msweatt) Fix this once code is merged
563 auto subManifestSize = m_subManifestSizes[manifest_it->file_name()];
564 if (IoUtil::writeData(packet, *manifest_it, subManifestSize, *fileState.first)) {
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700565 fileState.first->flush();
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700566 // update bitmap
567 fileState.second[packetNum] = true;
568 return true;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700569 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700570 return false;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800571}
572
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700573bool
574TorrentManager::writeTorrentSegment(const TorrentFile& segment, const std::string& path)
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700575{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700576 // validate the torrent
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700577 auto torrentPrefix = m_torrentFileName.getSubName(0, m_torrentFileName.size() - 1);
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700578 // check if we already have it
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700579 if (torrentPrefix.isPrefixOf(segment.getName()) &&
580 m_torrentSegments.end() == std::find(m_torrentSegments.begin(), m_torrentSegments.end(),
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700581 segment))
582 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700583 if(IoUtil::writeTorrentSegment(segment, path)) {
584 auto it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(),
585 [&segment](const TorrentFile& t){
586 return segment.getSegmentNumber() < t.getSegmentNumber() ;
587 });
588 m_torrentSegments.insert(it, segment);
589 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700590 }
591 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700592 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700593}
594
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700595
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700596bool TorrentManager::writeFileManifest(const FileManifest& manifest, const std::string& path)
597{
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700598 if (m_fileManifests.end() == std::find(m_fileManifests.begin(), m_fileManifests.end(),
599 manifest))
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700600 {
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700601 // update the state of the manager
602 if (0 == manifest.submanifest_number()) {
603 m_subManifestSizes[manifest.file_name()] = manifest.catalog().size();
604 }
605 if(IoUtil::writeFileManifest(manifest, path)) {
606 // add to collection
607 auto it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
608 [&manifest](const FileManifest& m){
609 return m.file_name() > manifest.file_name()
610 || (m.file_name() == manifest.file_name()
611 && (m.submanifest_number() > manifest.submanifest_number()));
612 });
613 m_fileManifests.insert(it, manifest);
614 return true;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700615 }
616 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700617 return false;
Mickey Sweatt599bfef2016-04-05 19:11:20 -0700618}
619
spirosmastorakisa46eee42016-04-05 14:24:45 -0700620void
621TorrentManager::downloadFileManifestSegment(const Name& manifestName,
622 const std::string& path,
623 std::shared_ptr<std::vector<Name>> packetNames,
624 TorrentManager::ManifestReceivedCallback onSuccess,
625 TorrentManager::FailedCallback onFailed)
626{
627 shared_ptr<Interest> interest = this->createInterest(manifestName);
628
629 auto dataReceived = [packetNames, path, onSuccess, onFailed, this]
630 (const Interest& interest, const Data& data) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700631 m_pendingInterests.erase(interest.getName());
spirosmastorakisa46eee42016-04-05 14:24:45 -0700632 // Stats Table update here...
633 m_stats_table_iter->incrementReceivedData();
634 m_retries = 0;
635
636 FileManifest file(data.wireEncode());
637
638 // Write the file manifest segment to disk...
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700639 if(writeFileManifest(file, path)) {
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700640 seed(file);
641 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700642 else {
643 onFailed(interest.getName(), "Write Failed");
644 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700645
646 const std::vector<Name>& packetsCatalog = file.catalog();
647 packetNames->insert(packetNames->end(), packetsCatalog.begin(), packetsCatalog.end());
648 shared_ptr<Name> nextSegmentPtr = file.submanifest_ptr();
649 if (nextSegmentPtr != nullptr) {
650 this->downloadFileManifestSegment(*nextSegmentPtr, path, packetNames, onSuccess, onFailed);
651 }
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700652 else {
spirosmastorakisa46eee42016-04-05 14:24:45 -0700653 onSuccess(*packetNames);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700654 }
spirosmastorakis9b68b532016-05-02 21:40:29 -0700655 this->sendInterest();
656 if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700657 shutdown();
658 }
spirosmastorakisa46eee42016-04-05 14:24:45 -0700659 };
660
661 auto dataFailed = [packetNames, path, manifestName, onFailed, this]
662 (const Interest& interest) {
spirosmastorakis9b68b532016-05-02 21:40:29 -0700663 m_pendingInterests.erase(interest.getName());
spirosmastorakisa46eee42016-04-05 14:24:45 -0700664 m_retries++;
665 if (m_retries >= MAX_NUM_OF_RETRIES) {
666 m_stats_table_iter++;
667 if (m_stats_table_iter == m_statsTable.end())
668 m_stats_table_iter = m_statsTable.begin();
669 }
670 onFailed(interest.getName(), "Unknown failure");
spirosmastorakis9b68b532016-05-02 21:40:29 -0700671 this->sendInterest();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700672 };
spirosmastorakis9b68b532016-05-02 21:40:29 -0700673 LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
674 m_interestQueue->push(interest, dataReceived, dataFailed);
675 this->sendInterest();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700676}
677
678void
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700679TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700680{
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700681 // handle if it is a torrent-file
spirosmastorakis9b68b532016-05-02 21:40:29 -0700682 LOG_DEBUG << "Interest Received: " << interest << std::endl;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700683 const auto& interestName = interest.getName();
684 std::shared_ptr<Data> data = nullptr;
685 auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700686 // determine if it is torrent file (that we have)
687 auto torrent_it = std::find_if(m_torrentSegments.begin(), m_torrentSegments.end(), cmp);
688 if (m_torrentSegments.end() != torrent_it) {
689 data = std::make_shared<Data>(*torrent_it);
spirosmastorakis50642f82016-04-08 12:11:18 -0700690 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700691 else {
692 // determine if it is manifest (that we have)
693 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(), cmp);
694 if (m_fileManifests.end() != manifest_it) {
695 data = std::make_shared<Data>(*manifest_it) ;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700696 }
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700697 else {
698 // determine if it is data packet (that we have)
699 auto manifestName = interestName.getSubName(0, interestName.size() - 2);
700 auto map_it = std::find_if(m_fileStates.begin(), m_fileStates.end(),
701 [&manifestName](const std::pair<Name,
702 std::pair<std::shared_ptr<fs::fstream>,
703 std::vector<bool>>>& kv){
704 return manifestName.isPrefixOf(kv.first);
705 });
706 if (m_fileStates.end() != map_it) {
707 auto packetName = interestName.getSubName(0, interestName.size() - 1);
708 // get out the bitmap to be sure we have the packet
709 auto& fileState = map_it->second;
710 const auto &bitmap = fileState.second;
711 auto packetNum = packetName.get(packetName.size() - 1).toSequenceNumber();
712 if (bitmap[packetNum]) {
713 // get the manifest
714 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
715 [&manifestName](const FileManifest& m) {
716 return manifestName.isPrefixOf(m.name());
717 });
718 auto manifestFileName = manifest_it->file_name();
719 auto filePath = m_dataPath + manifestFileName;
720 // TODO(msweatt) Explore why fileState stream does not work
721 fs::fstream is (filePath, fs::fstream::in | fs::fstream::binary);
Mickey Sweattfcbfb3d2016-04-13 17:05:17 -0700722 data = IoUtil::readDataPacket(interestName,
723 *manifest_it,
724 m_subManifestSizes[manifestFileName],
725 is);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700726 }
727 }
728 }
729 }
730 if (nullptr != data) {
731 m_face->put(*data);
732 }
733 else {
734 // TODO(msweatt) NACK
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700735 LOG_ERROR << "NACK: " << interest << std::endl;
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700736 }
737 return;
spirosmastorakisa46eee42016-04-05 14:24:45 -0700738}
739
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700740void
741TorrentManager::onRegisterFailed(const Name& prefix, const std::string& reason)
spirosmastorakisa46eee42016-04-05 14:24:45 -0700742{
Mickey Sweatt617d2d42016-04-25 22:02:08 -0700743 LOG_ERROR << "ERROR: Failed to register prefix \""
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700744 << prefix << "\" in local hub's daemon (" << reason << ")"
745 << std::endl;
Mickey Sweatt44e4fd92016-05-02 15:43:11 -0700746 shutdown();
spirosmastorakisa46eee42016-04-05 14:24:45 -0700747}
748
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700749shared_ptr<Interest>
750TorrentManager::createInterest(Name name)
751{
752 shared_ptr<Interest> interest = make_shared<Interest>(name);
753 interest->setInterestLifetime(time::milliseconds(2000));
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700754
755 // Select routable prefix
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700756 // TODO(spyros) Fix links
757 // Link link(name, { {1, m_stats_table_iter->getRecordName()} });
758 // m_keyChain->sign(link, signingWithSha256());
759 // Block linkWire = link.wireEncode();
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700760
761 // Stats Table update here...
762 m_stats_table_iter->incrementSentInterests();
763
764 m_sortingCounter++;
765 if (m_sortingCounter >= SORTING_INTERVAL) {
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700766 // Use the sorting interval to send out "ALIVE" Interests as well
767 // check whether we should send out an "ALIVE" Interest
Mickey Sweatt67133bf2016-04-25 12:37:35 -0700768 // if (m_updateHandler->needsUpdate()) {
769 // m_updateHandler->sendAliveInterest(m_stats_table_iter);
770 // }
spirosmastorakis4ff8c872016-04-14 09:51:38 -0700771 // Do the actual sorting related stuff
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700772 m_sortingCounter = 0;
773 m_statsTable.sort();
774 m_stats_table_iter = m_statsTable.begin();
775 m_retries = 0;
776 }
777
Mickey Sweatt15dde2d2016-04-28 23:42:45 -0700778 // interest->setLink(linkWire);
Mickey Sweatte908a5c2016-04-08 14:10:45 -0700779
780 return interest;
781}
782
spirosmastorakis9b68b532016-05-02 21:40:29 -0700783void
784TorrentManager::sendInterest()
785{
786 auto nackCallBack = [](const Interest& i, const lp::Nack& n) {
787 LOG_ERROR << "Nack received: " << n.getReason() << ": " << i << std::endl;
788 };
789 while (m_pendingInterests.size() < WINDOW_SIZE && !m_interestQueue->empty()) {
790 queueTuple tup = m_interestQueue->pop();
791 m_pendingInterests.insert(std::get<0>(tup)->getName());
792 LOG_DEBUG << "Sending: " << *(std::get<0>(tup)) << std::endl;
793 m_face->expressInterest(*std::get<0>(tup), std::get<1>(tup), nackCallBack, std::get<2>(tup));
794 }
795}
796
Mickey Sweatt527b0492016-03-02 11:07:48 -0800797} // end ntorrent
spirosmastorakisfd334462016-04-18 15:48:31 -0700798} // end ndn