blob: 305fa3b6afe993a4ea23c39325aa6ff0e170ea09 [file] [log] [blame]
Mickey Sweatt527b0492016-03-02 11:07:48 -08001#include "torrent-manager.hpp"
2
3#include "file-manifest.hpp"
4#include "torrent-file.hpp"
5
6#include <boost/filesystem.hpp>
7#include <boost/filesystem/fstream.hpp>
8
9#include <ndn-cxx/data.hpp>
10#include <ndn-cxx/security/key-chain.hpp>
11#include <ndn-cxx/security/signing-helpers.hpp>
12#include <ndn-cxx/util/io.hpp>
13
14#include <set>
15#include <string>
16#include <unordered_map>
17#include <vector>
18
19namespace fs = boost::filesystem;
20
21using std::string;
22using std::vector;
23
24namespace {
25// TODO(msweatt) Move this to a utility
26template<typename T>
27static vector<T>
28load_directory(const string& dirPath,
29 ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) {
30 vector<T> structures;
31
32 if (fs::exists(dirPath)) {
33 for(fs::directory_iterator it(dirPath);
34 it != fs::directory_iterator();
35 ++it)
36 {
37 auto data_ptr = ndn::io::load<T>(it->path().string(), encoding);
38 if (nullptr != data_ptr) {
39 structures.push_back(*data_ptr);
40 }
41 }
42 }
43 structures.shrink_to_fit();
44 return structures;
45}
46
47} // end anonymous
48
49namespace ndn {
50namespace ntorrent {
51
52// TODO(msweatt) Move this to a utility
53static vector<ndn::Data>
54packetize_file(const fs::path& filePath,
55 const ndn::Name& commonPrefix,
56 size_t dataPacketSize,
57 size_t subManifestSize,
58 size_t subManifestNum)
59{
60 BOOST_ASSERT(0 < dataPacketSize);
61 size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024
62 auto file_size = fs::file_size(filePath);
63 auto start_offset = subManifestNum * subManifestSize * dataPacketSize;
64 // determine the number of bytes in this submanifest
65 auto subManifestLength = subManifestSize * dataPacketSize;
66 auto remainingFileLength = file_size - start_offset;
67 subManifestLength = remainingFileLength < subManifestLength
68 ? remainingFileLength
69 : subManifestLength;
70 vector<ndn::Data> packets;
71 packets.reserve(subManifestLength/dataPacketSize + 1);
72 fs::ifstream fs(filePath, fs::ifstream::binary);
73 if (!fs) {
74 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string()));
75 }
76 // ensure that buffer is large enough to contain whole packets
77 // buffer size is either the entire file or the smallest number of data packets >= 2 GB
78 auto buffer_size =
79 subManifestLength < APPROX_BUFFER_SIZE ?
80 subManifestLength :
81 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
82 APPROX_BUFFER_SIZE :
83 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
84 vector<char> file_bytes;
85 file_bytes.reserve(buffer_size);
86 size_t bytes_read = 0;
87 fs.seekg(start_offset);
88 while(fs && bytes_read < subManifestLength && !fs.eof()) {
89 // read the file into the buffer
90 fs.read(&file_bytes.front(), buffer_size);
91 auto read_size = fs.gcount();
92 if (fs.bad() || read_size < 0) {
93 BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string()));
94 }
95 bytes_read += read_size;
96 char *curr_start = &file_bytes.front();
97 for (size_t i = 0u; i < buffer_size; i += dataPacketSize) {
98 // Build a packet from the data
99 Name packetName = commonPrefix;
100 packetName.appendSequenceNumber(packets.size());
101 Data d(packetName);
102 auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize;
103 d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length));
104 curr_start += content_length;
105 // append to the collection
106 packets.push_back(d);
107 }
108 file_bytes.clear();
109 // recompute the buffer_size
110 buffer_size =
111 subManifestLength - bytes_read < APPROX_BUFFER_SIZE ?
112 subManifestLength - bytes_read :
113 APPROX_BUFFER_SIZE % dataPacketSize == 0 ?
114 APPROX_BUFFER_SIZE :
115 APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize);
116 }
117 fs.close();
118 packets.shrink_to_fit();
119 ndn::security::KeyChain key_chain;
120 // sign all the packets
121 for (auto& p : packets) {
122 key_chain.sign(p, signingWithSha256());
123 }
124 return packets;
125}
126
127static vector<TorrentFile>
128intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName)
129{
130 security::KeyChain key_chain;
131 Name currSegmentFullName = initialSegmentName;
132 vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath);
133 // Starting with the initial segment name, verify the names, loading next name from torrentSegment
134 for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) {
135 TorrentFile& segment = *it;
136 key_chain.sign(segment, signingWithSha256());
137 if (segment.getFullName() != currSegmentFullName) {
138 vector<TorrentFile> correctSegments(torrentSegments.begin(), it);
139 torrentSegments.swap(correctSegments);
140 break;
141 }
142 // load the next full name
143 if (nullptr == segment.getTorrentFilePtr()) {
144 break;
145 }
146 currSegmentFullName = *segment.getTorrentFilePtr();
147 }
148 return torrentSegments;
149}
150
151static vector<FileManifest>
152intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments)
153{
154 security::KeyChain key_chain;
155
156 vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath);
157
158 // sign the manifests
159 std::for_each(manifests.begin(), manifests.end(),
160 [&key_chain](FileManifest& m){
161 key_chain.sign(m,signingWithSha256());
162 });
163
164 // put all names of manifests from the valid torrent files into a set
165 std::set<ndn::Name> validManifestNames;
166 for (const auto& segment : torrentSegments) {
167 const auto& catalog = segment.getCatalog();
168 validManifestNames.insert(catalog.begin(), catalog.end());
169 }
170
171 // put all names of file manifests from disk into a set
172 std::set<ndn::Name> loadedManifestNames;
173 std::for_each(manifests.begin(), manifests.end(),
174 [&loadedManifestNames](const FileManifest& m){
175 loadedManifestNames.insert(m.getFullName());
176 });
177
178 // the set of fileManifests that we have is simply the intersection
179 std::set<Name> output;
180 std::set_intersection(validManifestNames.begin() , validManifestNames.end(),
181 loadedManifestNames.begin(), loadedManifestNames.end(),
182 std::inserter(output, output.begin()));
183
184 // filter out those manifests that are not in this set
185 std::remove_if(manifests.begin(),
186 manifests.end(),
187 [&output](const FileManifest& m) {
188 return (output.end() == output.find(m.name()));
189 });
190
191 // order the manifests in the same order they are in the torrent
192 std::vector<Name> catalogNames;
193 for (const auto& segment : torrentSegments) {
194 const auto& catalog = segment.getCatalog();
195 catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end());
196 }
197 size_t curr_index = 0;
198 for (auto name : catalogNames) {
199 auto it = std::find_if(manifests.begin(), manifests.end(),
200 [&name](const FileManifest& m) {
201 return m.getFullName() == name;
202 });
203 if (it != manifests.end()) {
204 // not already in the correct position
205 if (it != manifests.begin() + curr_index) {
206 std::swap(manifests[curr_index], *it);
207 }
208 ++curr_index;
209 }
210 }
211
212 return manifests;
213}
214
215static vector<Data>
216intializeDataPackets(const string& filePath,
217 const FileManifest manifest,
218 const TorrentFile& torrentFile)
219{
220 vector<Data> packets;
221 auto subManifestNum = manifest.name().get(manifest.name().size() - 1).toSequenceNumber();
222
223 packets = packetize_file(filePath,
224 manifest.name(),
225 manifest.data_packet_size(),
226 manifest.catalog().size(),
227 subManifestNum);
228
229 auto catalog = manifest.catalog();
230
231 // Filter out invalid packet names
232 std::remove_if(packets.begin(), packets.end(),
233 [&packets, &catalog](const Data& p) {
234 return catalog.end() == std::find(catalog.begin(),
235 catalog.end(),
236 p.getFullName());
237 });
238 return packets;
239}
240
241void TorrentManager::Initialize()
242{
243 // .../<torrent_name>/torrent-file/<implicit_digest>
244 string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri();
245 string manifestPath = dataPath +"/manifests";
246 string torrentFilePath = dataPath +"/torrent_files";
247
248 // get the torrent file segments and manifests that we have.
249 if (!fs::exists(torrentFilePath)) {
250 return;
251 }
252 m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName);
253 if (m_torrentSegments.empty()) {
254 return;
255 }
256 m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments);
257 auto currTorrentFile_it = m_torrentSegments.begin();
258 for (const auto& m : m_fileManifests) {
259 // find the appropriate torrent file
260 auto currCatalog = currTorrentFile_it->getCatalog();
261 while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName()))
262 {
263 ++currTorrentFile_it;
264 currCatalog = currTorrentFile_it->getCatalog();
265 }
266 // construct the file name
267 auto fileName = m.name().getSubName(1, m.name().size() - 2).toUri();
268 auto filePath = m_dataPath + fileName;
269 // If there are any valid packets, add corresponding state to manager
270 auto packets = intializeDataPackets(filePath, m, *currTorrentFile_it);
271 if (!packets.empty()) {
272 // build the bit map
273 auto catalog = m.catalog();
274 vector<bool> fileBitMap(catalog.size());
275 auto read_it = packets.begin();
276 size_t i = 0;
277 for (auto name : catalog) {
278 if (name == read_it->getFullName()) {
279 ++read_it;
280 fileBitMap[i]= true;
281 }
282 ++i;
283 }
284 for (const auto& d : packets) {
285 seed(d);
286 }
287 auto s = std::make_shared<fs::fstream>(filePath, fs::fstream::binary
288 | fs::fstream::in
289 | fs::fstream::out);
290 m_fileStates[m.getFullName()] = std::make_pair(s, fileBitMap);
291 }
292 }
293 for (const auto& t : m_torrentSegments) {
294 seed(t);
295 }
296 for (const auto& m : m_fileManifests) {
297 seed(m);
298 }
299
300}
301
302void TorrentManager::seed(const Data& data) const {
303 // TODO(msweatt) IMPLEMENT ME
304}
305
306} // end ntorrent
307} // end ndn