blob: f62491fa7b73cc6f67d928ff9ab56fbfdb65028c [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
5
6#include <boost/filesystem.hpp>
7#include <boost/filesystem/fstream.hpp>
8
9#include <ndn-cxx/data.hpp>
10#include <ndn-cxx/security/key-chain.hpp>
11#include <ndn-cxx/security/signing-helpers.hpp>
12#include <ndn-cxx/util/io.hpp>
13
14#include <set>
15#include <string>
16#include <unordered_map>
17#include <vector>
18
19namespace fs = boost::filesystem;
20
21using std::string;
22using std::vector;
23
24namespace {
25// TODO(msweatt) Move this to a utility
26template<typename T>
27static vector<T>
28load_directory(const string& dirPath,
29 ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
30 vector<T> structures;
31
32 if (fs::exists(dirPath)) {
33 for(fs::directory_iterator it(dirPath);
34 it != fs::directory_iterator();
35 ++it)
36 {
37 auto data_ptr = ndn::io::load<T>(it->path().string(), encoding);
38 if (nullptr != data_ptr) {
39 structures.push_back(*data_ptr);
40 }
41 }
42 }
43 structures.shrink_to_fit();
44 return structures;
45}
46
47} // end anonymous
48
49namespace ndn {
50namespace ntorrent {
51
52// TODO(msweatt) Move this to a utility
53static vector<ndn::Data>
54packetize_file(const fs::path& filePath,
55 const ndn::Name& commonPrefix,
56 size_t dataPacketSize,
57 size_t subManifestSize,
58 size_t subManifestNum)
59{
60 BOOST_ASSERT(0 < dataPacketSize);
61 size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
62 auto file_size = fs::file_size(filePath);
63 auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
64 // determine the number of bytes in this submanifest
65 auto subManifestLength = subManifestSize * dataPacketSize;
66 auto remainingFileLength = file_size - start_offset;
67 subManifestLength = remainingFileLength < subManifestLength
68 ? remainingFileLength
69 : subManifestLength;
70 vector<ndn::Data> packets;
71 packets.reserve(subManifestLength/dataPacketSize + 1);
72 fs::ifstream fs(filePath, fs::ifstream::binary);
73 if (!fs) {
74 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
75 }
76 // ensure that buffer is large enough to contain whole packets
77 // buffer size is either the entire file or the smallest number of data packets >= 2 GB
78 auto buffer_size =
79 subManifestLength < APPROX_BUFFER_SIZE ?
80 subManifestLength :
81 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
82 APPROX_BUFFER_SIZE :
83 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
84 vector<char> file_bytes;
85 file_bytes.reserve(buffer_size);
86 size_t bytes_read = 0;
87 fs.seekg(start_offset);
88 while(fs && bytes_read < subManifestLength && !fs.eof()) {
89 // read the file into the buffer
90 fs.read(&file_bytes.front(), buffer_size);
91 auto read_size = fs.gcount();
92 if (fs.bad() || read_size < 0) {
93 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
94 }
95 bytes_read += read_size;
96 char *curr_start = &file_bytes.front();
97 for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
98 // Build a packet from the data
99 Name packetName = commonPrefix;
100 packetName.appendSequenceNumber(packets.size());
101 Data d(packetName);
102 auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
103 d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
104 curr_start += content_length;
105 // append to the collection
106 packets.push_back(d);
107 }
108 file_bytes.clear();
109 // recompute the buffer_size
110 buffer_size =
111 subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
112 subManifestLength - bytes_read :
113 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
114 APPROX_BUFFER_SIZE :
115 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
116 }
117 fs.close();
118 packets.shrink_to_fit();
119 ndn::security::KeyChain key_chain;
120 // sign all the packets
121 for (auto& p : packets) {
122 key_chain.sign(p, signingWithSha256());
123 }
124 return packets;
125}
126
127static vector<TorrentFile>
128intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
129{
130 security::KeyChain key_chain;
131 Name currSegmentFullName = initialSegmentName;
132 vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
133 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
134 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
135 TorrentFile& segment = *it;
136 key_chain.sign(segment, signingWithSha256());
137 if (segment.getFullName() != currSegmentFullName) {
138 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
139 torrentSegments.swap(correctSegments);
140 break;
141 }
142 // load the next full name
143 if (nullptr == segment.getTorrentFilePtr()) {
144 break;
145 }
146 currSegmentFullName = *segment.getTorrentFilePtr();
147 }
148 return torrentSegments;
149}
150
151static vector<FileManifest>
152intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
153{
154 security::KeyChain key_chain;
155
156 vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
157
158 // sign the manifests
159 std::for_each(manifests.begin(), manifests.end(),
160 [&key_chain](FileManifest& m){
161 key_chain.sign(m,signingWithSha256());
162 });
163
164 // put all names of manifests from the valid torrent files into a set
165 std::set<ndn::Name> validManifestNames;
166 for (const auto& segment : torrentSegments) {
167 const auto& catalog = segment.getCatalog();
168 validManifestNames.insert(catalog.begin(), catalog.end());
169 }
170
171 // put all names of file manifests from disk into a set
172 std::set<ndn::Name> loadedManifestNames;
173 std::for_each(manifests.begin(), manifests.end(),
174 [&loadedManifestNames](const FileManifest& m){
175 loadedManifestNames.insert(m.getFullName());
176 });
177
178 // the set of fileManifests that we have is simply the intersection
179 std::set<Name> output;
180 std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
181 loadedManifestNames.begin(), loadedManifestNames.end(),
182 std::inserter(output, output.begin()));
183
184 // filter out those manifests that are not in this set
185 std::remove_if(manifests.begin(),
186 manifests.end(),
187 [&output](const FileManifest& m) {
188 return (output.end() == output.find(m.name()));
189 });
190
191 // order the manifests in the same order they are in the torrent
192 std::vector<Name> catalogNames;
193 for (const auto& segment : torrentSegments) {
194 const auto& catalog = segment.getCatalog();
195 catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
196 }
197 size_t curr_index = 0;
198 for (auto name : catalogNames) {
199 auto it = std::find_if(manifests.begin(), manifests.end(),
200 [&name](const FileManifest& m) {
201 return m.getFullName() == name;
202 });
203 if (it != manifests.end()) {
204 // not already in the correct position
205 if (it != manifests.begin() + curr_index) {
206 std::swap(manifests[curr_index], *it);
207 }
208 ++curr_index;
209 }
210 }
211
212 return manifests;
213}
214
215static vector<Data>
216intializeDataPackets(const string& filePath,
217 const FileManifest manifest,
218 const TorrentFile& torrentFile)
219{
220 vector<Data> packets;
221 auto subManifestNum = manifest.name().get(manifest.name().size() - 1).toSequenceNumber();
222
223 packets = packetize_file(filePath,
224 manifest.name(),
225 manifest.data_packet_size(),
226 manifest.catalog().size(),
227 subManifestNum);
228
229 auto catalog = manifest.catalog();
230
231 // Filter out invalid packet names
232 std::remove_if(packets.begin(), packets.end(),
233 [&packets, &catalog](const Data& p) {
234 return catalog.end() == std::find(catalog.begin(),
235 catalog.end(),
236 p.getFullName());
237 });
238 return packets;
239}
240
Mickey Sweattafda1f12016-04-04 17:15:11 -0700241static std::pair<std::shared_ptr<fs::fstream>, std::vector<bool>>
242initializeFileState(const string& dataPath,
243 const FileManifest& manifest)
244{
245 // construct the file name
246 const auto manifestName = manifest.name();
247 auto fileName = manifestName.getSubName(1, manifestName.size() - 2).toUri();
248 auto filePath = dataPath + fileName;
249 vector<bool> fileBitMap(manifest.catalog().size());
250 auto fbits = fs::fstream::out | fs::fstream::binary;
251 // if file exists, use in O/W use concatenate mode
252 fbits |= fs::exists(filePath) ? fs::fstream::in : fs::fstream::ate;
253 auto s = std::make_shared<fs::fstream>(filePath, fbits);
254 if (!*s) {
255 BOOST_THROW_EXCEPTION(io::Error("Cannot open: " + dataPath));
256 }
257 return std::make_pair(s, fileBitMap);
258}
259
Mickey Sweatt527b0492016-03-02 11:07:48 -0800260void TorrentManager::Initialize()
261{
262 // .../<torrent_name>/torrent-file/<implicit_digest>
263 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
264 string manifestPath = dataPath +"/manifests";
265 string torrentFilePath = dataPath +"/torrent_files";
266
267 // get the torrent file segments and manifests that we have.
268 if (!fs::exists(torrentFilePath)) {
269 return;
270 }
271 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
272 if (m_torrentSegments.empty()) {
273 return;
274 }
275 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
276 auto currTorrentFile_it = m_torrentSegments.begin();
277 for (const auto& m : m_fileManifests) {
278 // find the appropriate torrent file
279 auto currCatalog = currTorrentFile_it->getCatalog();
280 while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
281 {
282 ++currTorrentFile_it;
283 currCatalog = currTorrentFile_it->getCatalog();
284 }
285 // construct the file name
286 auto fileName = m.name().getSubName(1, m.name().size() - 2).toUri();
Mickey Sweattafda1f12016-04-04 17:15:11 -0700287 fs::path filePath = m_dataPath + fileName;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800288 // If there are any valid packets, add corresponding state to manager
Mickey Sweattafda1f12016-04-04 17:15:11 -0700289 if (!fs::exists(filePath)) {
290 boost::filesystem::create_directories(filePath.parent_path());
291 continue;
292 }
293 auto packets = intializeDataPackets(filePath.string(), m, *currTorrentFile_it);
Mickey Sweatt527b0492016-03-02 11:07:48 -0800294 if (!packets.empty()) {
Mickey Sweattafda1f12016-04-04 17:15:11 -0700295 m_fileStates[m.getFullName()] = initializeFileState(m_dataPath, m);
296 auto& fileBitMap = m_fileStates[m.getFullName()].second;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800297 auto read_it = packets.begin();
298 size_t i = 0;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700299 for (auto name : m.catalog()) {
Mickey Sweatt527b0492016-03-02 11:07:48 -0800300 if (name == read_it->getFullName()) {
301 ++read_it;
Mickey Sweattafda1f12016-04-04 17:15:11 -0700302 fileBitMap[i] = true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800303 }
304 ++i;
305 }
306 for (const auto& d : packets) {
307 seed(d);
308 }
Mickey Sweatt527b0492016-03-02 11:07:48 -0800309 }
310 }
311 for (const auto& t : m_torrentSegments) {
312 seed(t);
313 }
314 for (const auto& m : m_fileManifests) {
315 seed(m);
316 }
Mickey Sweattafda1f12016-04-04 17:15:11 -0700317}
Mickey Sweatt527b0492016-03-02 11:07:48 -0800318
Mickey Sweattafda1f12016-04-04 17:15:11 -0700319bool TorrentManager::writeData(const Data& packet)
320{
321 // find correct manifest
322 const auto& packetName = packet.getName();
323 auto manifest_it = std::find_if(m_fileManifests.begin(), m_fileManifests.end(),
324 [&packetName](const FileManifest& m) {
325 return m.getName().isPrefixOf(packetName);
326 });
327 if (m_fileManifests.end() == manifest_it) {
328 return false;
329 }
330 // get file state out
331 auto& fileState = m_fileStates[manifest_it->getFullName()];
332 // if there is no open stream to the file
333 if (nullptr == fileState.first) {
334 fileState = initializeFileState(m_dataPath, *manifest_it);
335 }
336 auto packetNum = packetName.get(packet.getName().size() - 1).toSequenceNumber();
337 // if we already have the packet, do not rewrite it.
338 if (fileState.second[packetNum]) {
339 return false;
340 }
341 auto packetOffset = packetNum * manifest_it->data_packet_size();
342 // write data to disk
343 fileState.first->seekg(packetOffset);
344 try {
345 auto content = packet.getContent();
346 std::vector<char> data(content.value_begin(), content.value_end());
347 fileState.first->write(&data[0], data.size());
348 }
349 catch (io::Error &e) {
350 std::cerr << e.what() << std::endl;
351 return false;
352 }
353 // update bitmap
354 fileState.second[packetNum] = true;
355 return true;
Mickey Sweatt527b0492016-03-02 11:07:48 -0800356}
357
358void TorrentManager::seed(const Data& data) const {
359 // TODO(msweatt) IMPLEMENT ME
360}
361
362} // end ntorrent
363} // end ndn