blob: 7dd92e5b672e4d6f2edd82e58b4455fcf92935fc [file] [log] [blame]
Shuo Chenccfbe242014-04-29 23:57:51 +08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Junxiao Shi959c5b92016-08-23 01:05:27 +00003 * Copyright (c) 2014-2016, Regents of the University of California.
Shuo Chenccfbe242014-04-29 23:57:51 +08004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. if (not, see <http://www.gnu.org/licenses/>.
18 */
19
Alexander Afanasyev39d98072014-05-04 12:46:29 -070020#include "../src/repo-command-parameter.hpp"
21#include "../src/repo-command-response.hpp"
Shuo Chenccfbe242014-04-29 23:57:51 +080022
23#include <ndn-cxx/face.hpp>
24#include <ndn-cxx/security/key-chain.hpp>
Junxiao Shi959c5b92016-08-23 01:05:27 +000025#include <ndn-cxx/security/signing-helpers.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070026#include <ndn-cxx/util/scheduler.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080027#include <fstream>
28#include <string>
29#include <stdlib.h>
Wentao Shang91fb4f22014-05-20 10:55:22 -070030#include <stdint.h>
Shuo Chenccfbe242014-04-29 23:57:51 +080031#include <boost/filesystem.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070032#include <boost/lexical_cast.hpp>
33#include <boost/asio.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080034#include <boost/iostreams/operations.hpp>
35#include <boost/iostreams/read.hpp>
36
37namespace repo {
38
39using namespace ndn::time;
40
Wentao Shanga8f3c402014-10-30 14:03:27 -070041using std::shared_ptr;
42using std::make_shared;
43using std::bind;
44using std::placeholders::_1;
45using std::placeholders::_2;
46
Shuo Chenccfbe242014-04-29 23:57:51 +080047static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
48static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
49static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
50static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
Weiqi Shi5822e342014-08-21 20:05:30 -070051static const size_t PRE_SIGN_DATA_COUNT = 11;
Shuo Chenccfbe242014-04-29 23:57:51 +080052
53class NdnPutFile : ndn::noncopyable
54{
55public:
56 class Error : public std::runtime_error
57 {
58 public:
59 explicit
60 Error(const std::string& what)
61 : std::runtime_error(what)
62 {
63 }
64 };
65
66 NdnPutFile()
67 : isUnversioned(false)
68 , isSingle(false)
69 , useDigestSha256(false)
70 , freshnessPeriod(DEFAULT_FRESHNESS_PERIOD)
71 , interestLifetime(DEFAULT_INTEREST_LIFETIME)
72 , hasTimeout(false)
73 , timeout(0)
74 , insertStream(0)
75 , isVerbose(false)
Shuo Chenccfbe242014-04-29 23:57:51 +080076 , m_scheduler(m_face.getIoService())
77 , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
78 , m_processId(0)
79 , m_checkPeriod(DEFAULT_CHECK_PERIOD)
80 , m_currentSegmentNo(0)
81 , m_isFinished(false)
82 {
83 }
84
85 void
86 run();
87
88private:
89 void
90 prepareNextData(uint64_t referenceSegmentNo);
91
92 void
93 startInsertCommand();
94
95 void
96 onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
97
98 void
99 onInsertCommandTimeout(const ndn::Interest& interest);
100
101 void
102 onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
103
104 void
105 onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
106
107 void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700108 onRegisterSuccess(const ndn::Name& prefix);
109
110 void
Shuo Chenccfbe242014-04-29 23:57:51 +0800111 onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
112
113 void
114 stopProcess();
115
116 void
117 signData(ndn::Data& data);
118
119 void
120 startCheckCommand();
121
122 void
123 onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
124
125 void
126 onCheckCommandTimeout(const ndn::Interest& interest);
127
128 ndn::Interest
129 generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
130 const RepoCommandParameter& commandParameter);
131
132public:
133 bool isUnversioned;
134 bool isSingle;
135 bool useDigestSha256;
136 std::string identityForData;
137 std::string identityForCommand;
138 milliseconds freshnessPeriod;
139 milliseconds interestLifetime;
140 bool hasTimeout;
141 milliseconds timeout;
142 ndn::Name repoPrefix;
143 ndn::Name ndnName;
144 std::istream* insertStream;
145 bool isVerbose;
146
147private:
148 ndn::Face m_face;
149 ndn::Scheduler m_scheduler;
150 ndn::KeyChain m_keyChain;
Shuo Chenccfbe242014-04-29 23:57:51 +0800151 uint64_t m_timestampVersion;
152 uint64_t m_processId;
153 milliseconds m_checkPeriod;
154 size_t m_currentSegmentNo;
155 bool m_isFinished;
156 ndn::Name m_dataPrefix;
157
Wentao Shanga8f3c402014-10-30 14:03:27 -0700158 typedef std::map<uint64_t, shared_ptr<ndn::Data> > DataContainer;
Shuo Chenccfbe242014-04-29 23:57:51 +0800159 DataContainer m_data;
160};
161
162void
163NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
164{
165 // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
166 if (m_isFinished)
167 return;
168
169 size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
170
171 if (!m_data.empty()) {
172 uint64_t maxSegmentNo = m_data.rbegin()->first;
173
174 if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
175 // nothing to prepare
176 return;
177 }
178
179 nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
180 }
181
182 for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
183 uint8_t buffer[DEFAULT_BLOCK_SIZE];
184
185 std::streamsize readSize =
186 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
187
188 if (readSize <= 0) {
189 throw Error("Error reading from the input stream");
190 }
191
Wentao Shanga8f3c402014-10-30 14:03:27 -0700192 shared_ptr<ndn::Data> data =
193 make_shared<ndn::Data>(Name(m_dataPrefix)
Shuo Chenccfbe242014-04-29 23:57:51 +0800194 .appendSegment(m_currentSegmentNo));
195
196 if (insertStream->peek() == std::istream::traits_type::eof()) {
Weiqi Shi5822e342014-08-21 20:05:30 -0700197 data->setFinalBlockId(ndn::name::Component::fromSegment(m_currentSegmentNo));
Shuo Chenccfbe242014-04-29 23:57:51 +0800198 m_isFinished = true;
199 }
200
201 data->setContent(buffer, readSize);
202 data->setFreshnessPeriod(freshnessPeriod);
203 signData(*data);
204
205 m_data.insert(std::make_pair(m_currentSegmentNo, data));
206
207 ++m_currentSegmentNo;
208 }
209}
210
211void
212NdnPutFile::run()
213{
214 m_dataPrefix = ndnName;
215 if (!isUnversioned)
216 m_dataPrefix.appendVersion(m_timestampVersion);
217
218 if (isVerbose)
219 std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
220 m_face.setInterestFilter(m_dataPrefix,
221 isSingle ?
Wentao Shanga8f3c402014-10-30 14:03:27 -0700222 bind(&NdnPutFile::onSingleInterest, this, _1, _2)
223 :
224 bind(&NdnPutFile::onInterest, this, _1, _2),
225 bind(&NdnPutFile::onRegisterSuccess, this, _1),
226 bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
Shuo Chenccfbe242014-04-29 23:57:51 +0800227
Weiqi Shi5822e342014-08-21 20:05:30 -0700228
Shuo Chenccfbe242014-04-29 23:57:51 +0800229 if (hasTimeout)
Wentao Shanga8f3c402014-10-30 14:03:27 -0700230 m_scheduler.scheduleEvent(timeout, bind(&NdnPutFile::stopProcess, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800231
232 m_face.processEvents();
233}
234
235void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700236NdnPutFile::onRegisterSuccess(const Name& prefix)
237{
238 startInsertCommand();
239}
240
241void
Shuo Chenccfbe242014-04-29 23:57:51 +0800242NdnPutFile::startInsertCommand()
243{
244 RepoCommandParameter parameters;
245 parameters.setName(m_dataPrefix);
246 if (!isSingle) {
247 parameters.setStartBlockId(0);
248 }
249
250 ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
251 m_face.expressInterest(commandInterest,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700252 bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
253 bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
Shuo Chenccfbe242014-04-29 23:57:51 +0800254}
255
256void
257NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
258{
259 RepoCommandResponse response(data.getContent().blockFromValue());
260 int statusCode = response.getStatusCode();
261 if (statusCode >= 400) {
262 throw Error("insert command failed with code " +
263 boost::lexical_cast<std::string>(statusCode));
264 }
265 m_processId = response.getProcessId();
266
267 m_scheduler.scheduleEvent(m_checkPeriod,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700268 bind(&NdnPutFile::startCheckCommand, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800269}
270
271void
272NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
273{
274 throw Error("command response timeout");
275}
276
277void
278NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
279{
280 if (interest.getName().size() != prefix.size() + 1) {
281 if (isVerbose) {
282 std::cerr << "Error processing incoming interest " << interest << ": "
283 << "Unrecognized Interest" << std::endl;
284 }
285 return;
286 }
287
288 uint64_t segmentNo;
289 try {
290 ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
291 segmentNo = segmentComponent.toSegment();
292 }
293 catch (tlv::Error& e) {
294 if (isVerbose) {
295 std::cerr << "Error processing incoming interest " << interest << ": "
296 << e.what() << std::endl;
297 }
298 return;
299 }
300
301 prepareNextData(segmentNo);
302
303 DataContainer::iterator item = m_data.find(segmentNo);
304 if (item == m_data.end()) {
305 if (isVerbose) {
306 std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
307 }
308 return;
309 }
310
Weiqi Shi5822e342014-08-21 20:05:30 -0700311 if (m_isFinished) {
312 uint64_t final = m_currentSegmentNo - 1;
313 item->second->setFinalBlockId(ndn::name::Component::fromSegment(final));
314
315 }
Shuo Chenccfbe242014-04-29 23:57:51 +0800316 m_face.put(*item->second);
317}
318
319void
320NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
321{
322 BOOST_ASSERT(prefix == m_dataPrefix);
323
324 if (prefix != interest.getName()) {
325 if (isVerbose) {
326 std::cerr << "Received unexpected interest " << interest << std::endl;
327 }
328 return;
329 }
330
331 uint8_t buffer[DEFAULT_BLOCK_SIZE];
332 std::streamsize readSize =
333 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
334
335 if (readSize <= 0) {
336 throw Error("Error reading from the input stream");
337 }
338
339 if (insertStream->peek() != std::istream::traits_type::eof()) {
340 throw Error("Input data does not fit into one Data packet");
341 }
342
Wentao Shanga8f3c402014-10-30 14:03:27 -0700343 shared_ptr<ndn::Data> data = make_shared<ndn::Data>(m_dataPrefix);
Weiqi Shi5822e342014-08-21 20:05:30 -0700344 data->setContent(buffer, readSize);
345 data->setFreshnessPeriod(freshnessPeriod);
346 signData(*data);
347 m_face.put(*data);
Shuo Chenccfbe242014-04-29 23:57:51 +0800348
349 m_isFinished = true;
350}
351
352void
353NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
354{
355 throw Error("onRegisterFailed: " + reason);
356}
357
358void
359NdnPutFile::stopProcess()
360{
361 m_face.getIoService().stop();
362}
363
364void
365NdnPutFile::signData(ndn::Data& data)
366{
367 if (useDigestSha256) {
368 m_keyChain.signWithSha256(data);
369 }
370 else {
371 if (identityForData.empty())
372 m_keyChain.sign(data);
373 else {
374 ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
375 ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
376 m_keyChain.sign(data, certName);
377 }
378 }
379}
380
381void
382NdnPutFile::startCheckCommand()
383{
384 ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
385 RepoCommandParameter()
386 .setProcessId(m_processId));
387 m_face.expressInterest(checkInterest,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700388 bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
389 bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
Shuo Chenccfbe242014-04-29 23:57:51 +0800390}
391
392void
393NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
394{
395 RepoCommandResponse response(data.getContent().blockFromValue());
396 int statusCode = response.getStatusCode();
397 if (statusCode >= 400) {
398 throw Error("Insert check command failed with code: " +
399 boost::lexical_cast<std::string>(statusCode));
400 }
401
402 if (m_isFinished) {
403 uint64_t insertCount = response.getInsertNum();
404
405 if (isSingle) {
406 if (insertCount == 1) {
407 m_face.getIoService().stop();
408 return;
409 }
410 }
411 // Technically, the check should not infer, but directly has signal from repo that
412 // write operation has been finished
413
414 if (insertCount == m_currentSegmentNo) {
415 m_face.getIoService().stop();
416 return;
417 }
418 }
419
420 m_scheduler.scheduleEvent(m_checkPeriod,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700421 bind(&NdnPutFile::startCheckCommand, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800422}
423
424void
425NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
426{
427 throw Error("check response timeout");
428}
429
430ndn::Interest
431NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
432 const RepoCommandParameter& commandParameter)
433{
434 ndn::Interest interest(ndn::Name(commandPrefix)
435 .append(command)
436 .append(commandParameter.wireEncode()));
437 interest.setInterestLifetime(interestLifetime);
438
439 if (identityForCommand.empty())
Junxiao Shi959c5b92016-08-23 01:05:27 +0000440 m_keyChain.sign(interest);
Shuo Chenccfbe242014-04-29 23:57:51 +0800441 else {
Junxiao Shi959c5b92016-08-23 01:05:27 +0000442 m_keyChain.sign(interest, ndn::signingByIdentity(identityForCommand));
Shuo Chenccfbe242014-04-29 23:57:51 +0800443 }
444
445 return interest;
446}
447
448static void
449usage()
450{
451 fprintf(stderr,
452 "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
453 " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
454 "\n"
455 " Write a file into a repo.\n"
456 " -u: unversioned: do not add a version component\n"
457 " -s: single: do not add version or segment component, implies -u\n"
458 " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
459 " -i: specify identity used for signing Data\n"
460 " -I: specify identity used for signing commands\n"
461 " -x: FreshnessPeriod in milliseconds\n"
462 " -l: InterestLifetime in milliseconds for each command\n"
463 " -w: timeout in milliseconds for whole process (default unlimited)\n"
464 " -v: be verbose\n"
465 " repo-prefix: repo command prefix\n"
466 " ndn-name: NDN Name prefix for written Data\n"
467 " filename: local file name; \"-\" reads from stdin\n"
468 );
469 exit(1);
470}
471
472int
473main(int argc, char** argv)
474{
475 NdnPutFile ndnPutFile;
476 int opt;
477 while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
478 switch (opt) {
479 case 'u':
480 ndnPutFile.isUnversioned = true;
481 break;
482 case 's':
483 ndnPutFile.isSingle = true;
484 break;
485 case 'D':
486 ndnPutFile.useDigestSha256 = true;
487 break;
488 case 'i':
489 ndnPutFile.identityForData = std::string(optarg);
490 break;
491 case 'I':
492 ndnPutFile.identityForCommand = std::string(optarg);
493 break;
494 case 'x':
495 try {
496 ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
497 }
498 catch (boost::bad_lexical_cast&) {
499 std::cerr << "-x option should be an integer.";
500 return 1;
501 }
502 break;
503 case 'l':
504 try {
505 ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
506 }
507 catch (boost::bad_lexical_cast&) {
508 std::cerr << "-l option should be an integer.";
509 return 1;
510 }
511 break;
512 case 'w':
513 ndnPutFile.hasTimeout = true;
514 try {
515 ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
516 }
517 catch (boost::bad_lexical_cast&) {
518 std::cerr << "-w option should be an integer.";
519 return 1;
520 }
521 break;
522 case 'v':
523 ndnPutFile.isVerbose = true;
524 break;
525 case 'h':
526 usage();
527 break;
528 default:
529 break;
530 }
531 }
532
533 argc -= optind;
534 argv += optind;
535
536 if (argc != 3)
537 usage();
538
539 ndnPutFile.repoPrefix = Name(argv[0]);
540 ndnPutFile.ndnName = Name(argv[1]);
541 if (strcmp(argv[2], "-") == 0) {
542
543 ndnPutFile.insertStream = &std::cin;
544 ndnPutFile.run();
545 }
546 else {
547 std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
548 if (!inputFileStream.is_open()) {
549 std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
550 return 1;
551 }
552
553 ndnPutFile.insertStream = &inputFileStream;
554 ndnPutFile.run();
555 }
556
557 // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
558
559 return 0;
560}
561
562} // namespace repo
563
564int
565main(int argc, char** argv)
566{
567 try {
568 return repo::main(argc, argv);
569 }
570 catch (std::exception& e) {
571 std::cerr << "ERROR: " << e.what() << std::endl;
572 return 2;
573 }
574}