Mickey Sweatt | 527b049 | 2016-03-02 11:07:48 -0800 | [diff] [blame^] | 1 | #include "torrent-manager.hpp" |
| 2 | |
| 3 | #include "file-manifest.hpp" |
| 4 | #include "torrent-file.hpp" |
| 5 | |
| 6 | #include <boost/filesystem.hpp> |
| 7 | #include <boost/filesystem/fstream.hpp> |
| 8 | |
| 9 | #include <ndn-cxx/data.hpp> |
| 10 | #include <ndn-cxx/security/key-chain.hpp> |
| 11 | #include <ndn-cxx/security/signing-helpers.hpp> |
| 12 | #include <ndn-cxx/util/io.hpp> |
| 13 | |
| 14 | #include <set> |
| 15 | #include <string> |
| 16 | #include <unordered_map> |
| 17 | #include <vector> |
| 18 | |
| 19 | namespace fs = boost::filesystem; |
| 20 | |
| 21 | using std::string; |
| 22 | using std::vector; |
| 23 | |
| 24 | namespace { |
| 25 | // TODO(msweatt) Move this to a utility |
| 26 | template<typename T> |
| 27 | static vector<T> |
| 28 | load_directory(const string& dirPath, |
| 29 | ndn::io::IoEncoding encoding = ndn::io::IoEncoding::BASE_64) { |
| 30 | vector<T> structures; |
| 31 | |
| 32 | if (fs::exists(dirPath)) { |
| 33 | for(fs::directory_iterator it(dirPath); |
| 34 | it != fs::directory_iterator(); |
| 35 | ++it) |
| 36 | { |
| 37 | auto data_ptr = ndn::io::load<T>(it->path().string(), encoding); |
| 38 | if (nullptr != data_ptr) { |
| 39 | structures.push_back(*data_ptr); |
| 40 | } |
| 41 | } |
| 42 | } |
| 43 | structures.shrink_to_fit(); |
| 44 | return structures; |
| 45 | } |
| 46 | |
| 47 | } // end anonymous |
| 48 | |
| 49 | namespace ndn { |
| 50 | namespace ntorrent { |
| 51 | |
| 52 | // TODO(msweatt) Move this to a utility |
| 53 | static vector<ndn::Data> |
| 54 | packetize_file(const fs::path& filePath, |
| 55 | const ndn::Name& commonPrefix, |
| 56 | size_t dataPacketSize, |
| 57 | size_t subManifestSize, |
| 58 | size_t subManifestNum) |
| 59 | { |
| 60 | BOOST_ASSERT(0 < dataPacketSize); |
| 61 | size_t APPROX_BUFFER_SIZE = std::numeric_limits<int>::max(); // 2 * 1024 * 1024 *1024 |
| 62 | auto file_size = fs::file_size(filePath); |
| 63 | auto start_offset = subManifestNum * subManifestSize * dataPacketSize; |
| 64 | // determine the number of bytes in this submanifest |
| 65 | auto subManifestLength = subManifestSize * dataPacketSize; |
| 66 | auto remainingFileLength = file_size - start_offset; |
| 67 | subManifestLength = remainingFileLength < subManifestLength |
| 68 | ? remainingFileLength |
| 69 | : subManifestLength; |
| 70 | vector<ndn::Data> packets; |
| 71 | packets.reserve(subManifestLength/dataPacketSize + 1); |
| 72 | fs::ifstream fs(filePath, fs::ifstream::binary); |
| 73 | if (!fs) { |
| 74 | BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when opening" + filePath.string())); |
| 75 | } |
| 76 | // ensure that buffer is large enough to contain whole packets |
| 77 | // buffer size is either the entire file or the smallest number of data packets >= 2 GB |
| 78 | auto buffer_size = |
| 79 | subManifestLength < APPROX_BUFFER_SIZE ? |
| 80 | subManifestLength : |
| 81 | APPROX_BUFFER_SIZE % dataPacketSize == 0 ? |
| 82 | APPROX_BUFFER_SIZE : |
| 83 | APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize); |
| 84 | vector<char> file_bytes; |
| 85 | file_bytes.reserve(buffer_size); |
| 86 | size_t bytes_read = 0; |
| 87 | fs.seekg(start_offset); |
| 88 | while(fs && bytes_read < subManifestLength && !fs.eof()) { |
| 89 | // read the file into the buffer |
| 90 | fs.read(&file_bytes.front(), buffer_size); |
| 91 | auto read_size = fs.gcount(); |
| 92 | if (fs.bad() || read_size < 0) { |
| 93 | BOOST_THROW_EXCEPTION(FileManifest::Error("IO Error when reading" + filePath.string())); |
| 94 | } |
| 95 | bytes_read += read_size; |
| 96 | char *curr_start = &file_bytes.front(); |
| 97 | for (size_t i = 0u; i < buffer_size; i += dataPacketSize) { |
| 98 | // Build a packet from the data |
| 99 | Name packetName = commonPrefix; |
| 100 | packetName.appendSequenceNumber(packets.size()); |
| 101 | Data d(packetName); |
| 102 | auto content_length = i + dataPacketSize > buffer_size ? buffer_size - i : dataPacketSize; |
| 103 | d.setContent(encoding::makeBinaryBlock(tlv::Content, curr_start, content_length)); |
| 104 | curr_start += content_length; |
| 105 | // append to the collection |
| 106 | packets.push_back(d); |
| 107 | } |
| 108 | file_bytes.clear(); |
| 109 | // recompute the buffer_size |
| 110 | buffer_size = |
| 111 | subManifestLength - bytes_read < APPROX_BUFFER_SIZE ? |
| 112 | subManifestLength - bytes_read : |
| 113 | APPROX_BUFFER_SIZE % dataPacketSize == 0 ? |
| 114 | APPROX_BUFFER_SIZE : |
| 115 | APPROX_BUFFER_SIZE + dataPacketSize - (APPROX_BUFFER_SIZE % dataPacketSize); |
| 116 | } |
| 117 | fs.close(); |
| 118 | packets.shrink_to_fit(); |
| 119 | ndn::security::KeyChain key_chain; |
| 120 | // sign all the packets |
| 121 | for (auto& p : packets) { |
| 122 | key_chain.sign(p, signingWithSha256()); |
| 123 | } |
| 124 | return packets; |
| 125 | } |
| 126 | |
| 127 | static vector<TorrentFile> |
| 128 | intializeTorrentSegments(const string& torrentFilePath, const Name& initialSegmentName) |
| 129 | { |
| 130 | security::KeyChain key_chain; |
| 131 | Name currSegmentFullName = initialSegmentName; |
| 132 | vector<TorrentFile> torrentSegments = load_directory<TorrentFile>(torrentFilePath); |
| 133 | // Starting with the initial segment name, verify the names, loading next name from torrentSegment |
| 134 | for (auto it = torrentSegments.begin(); it != torrentSegments.end(); ++it) { |
| 135 | TorrentFile& segment = *it; |
| 136 | key_chain.sign(segment, signingWithSha256()); |
| 137 | if (segment.getFullName() != currSegmentFullName) { |
| 138 | vector<TorrentFile> correctSegments(torrentSegments.begin(), it); |
| 139 | torrentSegments.swap(correctSegments); |
| 140 | break; |
| 141 | } |
| 142 | // load the next full name |
| 143 | if (nullptr == segment.getTorrentFilePtr()) { |
| 144 | break; |
| 145 | } |
| 146 | currSegmentFullName = *segment.getTorrentFilePtr(); |
| 147 | } |
| 148 | return torrentSegments; |
| 149 | } |
| 150 | |
| 151 | static vector<FileManifest> |
| 152 | intializeFileManifests(const string& manifestPath, vector<TorrentFile> torrentSegments) |
| 153 | { |
| 154 | security::KeyChain key_chain; |
| 155 | |
| 156 | vector<FileManifest> manifests = load_directory<FileManifest>(manifestPath); |
| 157 | |
| 158 | // sign the manifests |
| 159 | std::for_each(manifests.begin(), manifests.end(), |
| 160 | [&key_chain](FileManifest& m){ |
| 161 | key_chain.sign(m,signingWithSha256()); |
| 162 | }); |
| 163 | |
| 164 | // put all names of manifests from the valid torrent files into a set |
| 165 | std::set<ndn::Name> validManifestNames; |
| 166 | for (const auto& segment : torrentSegments) { |
| 167 | const auto& catalog = segment.getCatalog(); |
| 168 | validManifestNames.insert(catalog.begin(), catalog.end()); |
| 169 | } |
| 170 | |
| 171 | // put all names of file manifests from disk into a set |
| 172 | std::set<ndn::Name> loadedManifestNames; |
| 173 | std::for_each(manifests.begin(), manifests.end(), |
| 174 | [&loadedManifestNames](const FileManifest& m){ |
| 175 | loadedManifestNames.insert(m.getFullName()); |
| 176 | }); |
| 177 | |
| 178 | // the set of fileManifests that we have is simply the intersection |
| 179 | std::set<Name> output; |
| 180 | std::set_intersection(validManifestNames.begin() , validManifestNames.end(), |
| 181 | loadedManifestNames.begin(), loadedManifestNames.end(), |
| 182 | std::inserter(output, output.begin())); |
| 183 | |
| 184 | // filter out those manifests that are not in this set |
| 185 | std::remove_if(manifests.begin(), |
| 186 | manifests.end(), |
| 187 | [&output](const FileManifest& m) { |
| 188 | return (output.end() == output.find(m.name())); |
| 189 | }); |
| 190 | |
| 191 | // order the manifests in the same order they are in the torrent |
| 192 | std::vector<Name> catalogNames; |
| 193 | for (const auto& segment : torrentSegments) { |
| 194 | const auto& catalog = segment.getCatalog(); |
| 195 | catalogNames.insert(catalogNames.end(), catalog.begin(), catalog.end()); |
| 196 | } |
| 197 | size_t curr_index = 0; |
| 198 | for (auto name : catalogNames) { |
| 199 | auto it = std::find_if(manifests.begin(), manifests.end(), |
| 200 | [&name](const FileManifest& m) { |
| 201 | return m.getFullName() == name; |
| 202 | }); |
| 203 | if (it != manifests.end()) { |
| 204 | // not already in the correct position |
| 205 | if (it != manifests.begin() + curr_index) { |
| 206 | std::swap(manifests[curr_index], *it); |
| 207 | } |
| 208 | ++curr_index; |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | return manifests; |
| 213 | } |
| 214 | |
| 215 | static vector<Data> |
| 216 | intializeDataPackets(const string& filePath, |
| 217 | const FileManifest manifest, |
| 218 | const TorrentFile& torrentFile) |
| 219 | { |
| 220 | vector<Data> packets; |
| 221 | auto subManifestNum = manifest.name().get(manifest.name().size() - 1).toSequenceNumber(); |
| 222 | |
| 223 | packets = packetize_file(filePath, |
| 224 | manifest.name(), |
| 225 | manifest.data_packet_size(), |
| 226 | manifest.catalog().size(), |
| 227 | subManifestNum); |
| 228 | |
| 229 | auto catalog = manifest.catalog(); |
| 230 | |
| 231 | // Filter out invalid packet names |
| 232 | std::remove_if(packets.begin(), packets.end(), |
| 233 | [&packets, &catalog](const Data& p) { |
| 234 | return catalog.end() == std::find(catalog.begin(), |
| 235 | catalog.end(), |
| 236 | p.getFullName()); |
| 237 | }); |
| 238 | return packets; |
| 239 | } |
| 240 | |
| 241 | void TorrentManager::Initialize() |
| 242 | { |
| 243 | // .../<torrent_name>/torrent-file/<implicit_digest> |
| 244 | string dataPath = ".appdata/" + m_torrentFileName.get(-3).toUri(); |
| 245 | string manifestPath = dataPath +"/manifests"; |
| 246 | string torrentFilePath = dataPath +"/torrent_files"; |
| 247 | |
| 248 | // get the torrent file segments and manifests that we have. |
| 249 | if (!fs::exists(torrentFilePath)) { |
| 250 | return; |
| 251 | } |
| 252 | m_torrentSegments = intializeTorrentSegments(torrentFilePath, m_torrentFileName); |
| 253 | if (m_torrentSegments.empty()) { |
| 254 | return; |
| 255 | } |
| 256 | m_fileManifests = intializeFileManifests(manifestPath, m_torrentSegments); |
| 257 | auto currTorrentFile_it = m_torrentSegments.begin(); |
| 258 | for (const auto& m : m_fileManifests) { |
| 259 | // find the appropriate torrent file |
| 260 | auto currCatalog = currTorrentFile_it->getCatalog(); |
| 261 | while (currCatalog.end() == std::find(currCatalog.begin(), currCatalog.end(), m.getFullName())) |
| 262 | { |
| 263 | ++currTorrentFile_it; |
| 264 | currCatalog = currTorrentFile_it->getCatalog(); |
| 265 | } |
| 266 | // construct the file name |
| 267 | auto fileName = m.name().getSubName(1, m.name().size() - 2).toUri(); |
| 268 | auto filePath = m_dataPath + fileName; |
| 269 | // If there are any valid packets, add corresponding state to manager |
| 270 | auto packets = intializeDataPackets(filePath, m, *currTorrentFile_it); |
| 271 | if (!packets.empty()) { |
| 272 | // build the bit map |
| 273 | auto catalog = m.catalog(); |
| 274 | vector<bool> fileBitMap(catalog.size()); |
| 275 | auto read_it = packets.begin(); |
| 276 | size_t i = 0; |
| 277 | for (auto name : catalog) { |
| 278 | if (name == read_it->getFullName()) { |
| 279 | ++read_it; |
| 280 | fileBitMap[i]= true; |
| 281 | } |
| 282 | ++i; |
| 283 | } |
| 284 | for (const auto& d : packets) { |
| 285 | seed(d); |
| 286 | } |
| 287 | auto s = std::make_shared<fs::fstream>(filePath, fs::fstream::binary |
| 288 | | fs::fstream::in |
| 289 | | fs::fstream::out); |
| 290 | m_fileStates[m.getFullName()] = std::make_pair(s, fileBitMap); |
| 291 | } |
| 292 | } |
| 293 | for (const auto& t : m_torrentSegments) { |
| 294 | seed(t); |
| 295 | } |
| 296 | for (const auto& m : m_fileManifests) { |
| 297 | seed(m); |
| 298 | } |
| 299 | |
| 300 | } |
| 301 | |
| 302 | void TorrentManager::seed(const Data& data) const { |
| 303 | // TODO(msweatt) IMPLEMENT ME |
| 304 | } |
| 305 | |
| 306 | } // end ntorrent |
| 307 | } // end ndn |