blob: 9d20a637c2400bcf9f5c1ea5873c2457e2b51f11 [file] [log] [blame]
Shuo Chenccfbe242014-04-29 23:57:51 +08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. if (not, see <http://www.gnu.org/licenses/>.
18 */
19
Alexander Afanasyev39d98072014-05-04 12:46:29 -070020#include "../src/repo-command-parameter.hpp"
21#include "../src/repo-command-response.hpp"
Shuo Chenccfbe242014-04-29 23:57:51 +080022
23#include <ndn-cxx/face.hpp>
24#include <ndn-cxx/security/key-chain.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070025#include <ndn-cxx/util/scheduler.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080026#include <ndn-cxx/util/command-interest-generator.hpp>
27#include <fstream>
28#include <string>
29#include <stdlib.h>
Wentao Shang91fb4f22014-05-20 10:55:22 -070030#include <stdint.h>
Shuo Chenccfbe242014-04-29 23:57:51 +080031#include <boost/filesystem.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070032#include <boost/lexical_cast.hpp>
33#include <boost/asio.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080034#include <boost/iostreams/operations.hpp>
35#include <boost/iostreams/read.hpp>
36
37namespace repo {
38
39using namespace ndn::time;
40
Wentao Shanga8f3c402014-10-30 14:03:27 -070041using std::shared_ptr;
42using std::make_shared;
43using std::bind;
44using std::placeholders::_1;
45using std::placeholders::_2;
46
Shuo Chenccfbe242014-04-29 23:57:51 +080047static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
48static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
49static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
50static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
Weiqi Shi5822e342014-08-21 20:05:30 -070051static const size_t PRE_SIGN_DATA_COUNT = 11;
Shuo Chenccfbe242014-04-29 23:57:51 +080052
53class NdnPutFile : ndn::noncopyable
54{
55public:
56 class Error : public std::runtime_error
57 {
58 public:
59 explicit
60 Error(const std::string& what)
61 : std::runtime_error(what)
62 {
63 }
64 };
65
66 NdnPutFile()
67 : isUnversioned(false)
68 , isSingle(false)
69 , useDigestSha256(false)
70 , freshnessPeriod(DEFAULT_FRESHNESS_PERIOD)
71 , interestLifetime(DEFAULT_INTEREST_LIFETIME)
72 , hasTimeout(false)
73 , timeout(0)
74 , insertStream(0)
75 , isVerbose(false)
76
77 , m_scheduler(m_face.getIoService())
78 , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
79 , m_processId(0)
80 , m_checkPeriod(DEFAULT_CHECK_PERIOD)
81 , m_currentSegmentNo(0)
82 , m_isFinished(false)
83 {
84 }
85
86 void
87 run();
88
89private:
90 void
91 prepareNextData(uint64_t referenceSegmentNo);
92
93 void
94 startInsertCommand();
95
96 void
97 onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
98
99 void
100 onInsertCommandTimeout(const ndn::Interest& interest);
101
102 void
103 onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
104
105 void
106 onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
107
108 void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700109 onRegisterSuccess(const ndn::Name& prefix);
110
111 void
Shuo Chenccfbe242014-04-29 23:57:51 +0800112 onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
113
114 void
115 stopProcess();
116
117 void
118 signData(ndn::Data& data);
119
120 void
121 startCheckCommand();
122
123 void
124 onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
125
126 void
127 onCheckCommandTimeout(const ndn::Interest& interest);
128
129 ndn::Interest
130 generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
131 const RepoCommandParameter& commandParameter);
132
133public:
134 bool isUnversioned;
135 bool isSingle;
136 bool useDigestSha256;
137 std::string identityForData;
138 std::string identityForCommand;
139 milliseconds freshnessPeriod;
140 milliseconds interestLifetime;
141 bool hasTimeout;
142 milliseconds timeout;
143 ndn::Name repoPrefix;
144 ndn::Name ndnName;
145 std::istream* insertStream;
146 bool isVerbose;
147
148private:
149 ndn::Face m_face;
150 ndn::Scheduler m_scheduler;
151 ndn::KeyChain m_keyChain;
152 ndn::CommandInterestGenerator m_generator;
153 uint64_t m_timestampVersion;
154 uint64_t m_processId;
155 milliseconds m_checkPeriod;
156 size_t m_currentSegmentNo;
157 bool m_isFinished;
158 ndn::Name m_dataPrefix;
159
Wentao Shanga8f3c402014-10-30 14:03:27 -0700160 typedef std::map<uint64_t, shared_ptr<ndn::Data> > DataContainer;
Shuo Chenccfbe242014-04-29 23:57:51 +0800161 DataContainer m_data;
162};
163
164void
165NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
166{
167 // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
168 if (m_isFinished)
169 return;
170
171 size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
172
173 if (!m_data.empty()) {
174 uint64_t maxSegmentNo = m_data.rbegin()->first;
175
176 if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
177 // nothing to prepare
178 return;
179 }
180
181 nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
182 }
183
184 for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
185 uint8_t buffer[DEFAULT_BLOCK_SIZE];
186
187 std::streamsize readSize =
188 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
189
190 if (readSize <= 0) {
191 throw Error("Error reading from the input stream");
192 }
193
Wentao Shanga8f3c402014-10-30 14:03:27 -0700194 shared_ptr<ndn::Data> data =
195 make_shared<ndn::Data>(Name(m_dataPrefix)
Shuo Chenccfbe242014-04-29 23:57:51 +0800196 .appendSegment(m_currentSegmentNo));
197
198 if (insertStream->peek() == std::istream::traits_type::eof()) {
Weiqi Shi5822e342014-08-21 20:05:30 -0700199 data->setFinalBlockId(ndn::name::Component::fromSegment(m_currentSegmentNo));
Shuo Chenccfbe242014-04-29 23:57:51 +0800200 m_isFinished = true;
201 }
202
203 data->setContent(buffer, readSize);
204 data->setFreshnessPeriod(freshnessPeriod);
205 signData(*data);
206
207 m_data.insert(std::make_pair(m_currentSegmentNo, data));
208
209 ++m_currentSegmentNo;
210 }
211}
212
213void
214NdnPutFile::run()
215{
216 m_dataPrefix = ndnName;
217 if (!isUnversioned)
218 m_dataPrefix.appendVersion(m_timestampVersion);
219
220 if (isVerbose)
221 std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
222 m_face.setInterestFilter(m_dataPrefix,
223 isSingle ?
Wentao Shanga8f3c402014-10-30 14:03:27 -0700224 bind(&NdnPutFile::onSingleInterest, this, _1, _2)
225 :
226 bind(&NdnPutFile::onInterest, this, _1, _2),
227 bind(&NdnPutFile::onRegisterSuccess, this, _1),
228 bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
Shuo Chenccfbe242014-04-29 23:57:51 +0800229
Weiqi Shi5822e342014-08-21 20:05:30 -0700230
Shuo Chenccfbe242014-04-29 23:57:51 +0800231 if (hasTimeout)
Wentao Shanga8f3c402014-10-30 14:03:27 -0700232 m_scheduler.scheduleEvent(timeout, bind(&NdnPutFile::stopProcess, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800233
234 m_face.processEvents();
235}
236
237void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700238NdnPutFile::onRegisterSuccess(const Name& prefix)
239{
240 startInsertCommand();
241}
242
243void
Shuo Chenccfbe242014-04-29 23:57:51 +0800244NdnPutFile::startInsertCommand()
245{
246 RepoCommandParameter parameters;
247 parameters.setName(m_dataPrefix);
248 if (!isSingle) {
249 parameters.setStartBlockId(0);
250 }
251
252 ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
253 m_face.expressInterest(commandInterest,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700254 bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
255 bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
Shuo Chenccfbe242014-04-29 23:57:51 +0800256}
257
258void
259NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
260{
261 RepoCommandResponse response(data.getContent().blockFromValue());
262 int statusCode = response.getStatusCode();
263 if (statusCode >= 400) {
264 throw Error("insert command failed with code " +
265 boost::lexical_cast<std::string>(statusCode));
266 }
267 m_processId = response.getProcessId();
268
269 m_scheduler.scheduleEvent(m_checkPeriod,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700270 bind(&NdnPutFile::startCheckCommand, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800271}
272
273void
274NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
275{
276 throw Error("command response timeout");
277}
278
279void
280NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
281{
282 if (interest.getName().size() != prefix.size() + 1) {
283 if (isVerbose) {
284 std::cerr << "Error processing incoming interest " << interest << ": "
285 << "Unrecognized Interest" << std::endl;
286 }
287 return;
288 }
289
290 uint64_t segmentNo;
291 try {
292 ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
293 segmentNo = segmentComponent.toSegment();
294 }
295 catch (tlv::Error& e) {
296 if (isVerbose) {
297 std::cerr << "Error processing incoming interest " << interest << ": "
298 << e.what() << std::endl;
299 }
300 return;
301 }
302
303 prepareNextData(segmentNo);
304
305 DataContainer::iterator item = m_data.find(segmentNo);
306 if (item == m_data.end()) {
307 if (isVerbose) {
308 std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
309 }
310 return;
311 }
312
Weiqi Shi5822e342014-08-21 20:05:30 -0700313 if (m_isFinished) {
314 uint64_t final = m_currentSegmentNo - 1;
315 item->second->setFinalBlockId(ndn::name::Component::fromSegment(final));
316
317 }
Shuo Chenccfbe242014-04-29 23:57:51 +0800318 m_face.put(*item->second);
319}
320
321void
322NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
323{
324 BOOST_ASSERT(prefix == m_dataPrefix);
325
326 if (prefix != interest.getName()) {
327 if (isVerbose) {
328 std::cerr << "Received unexpected interest " << interest << std::endl;
329 }
330 return;
331 }
332
333 uint8_t buffer[DEFAULT_BLOCK_SIZE];
334 std::streamsize readSize =
335 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
336
337 if (readSize <= 0) {
338 throw Error("Error reading from the input stream");
339 }
340
341 if (insertStream->peek() != std::istream::traits_type::eof()) {
342 throw Error("Input data does not fit into one Data packet");
343 }
344
Wentao Shanga8f3c402014-10-30 14:03:27 -0700345 shared_ptr<ndn::Data> data = make_shared<ndn::Data>(m_dataPrefix);
Weiqi Shi5822e342014-08-21 20:05:30 -0700346 data->setContent(buffer, readSize);
347 data->setFreshnessPeriod(freshnessPeriod);
348 signData(*data);
349 m_face.put(*data);
Shuo Chenccfbe242014-04-29 23:57:51 +0800350
351 m_isFinished = true;
352}
353
354void
355NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
356{
357 throw Error("onRegisterFailed: " + reason);
358}
359
360void
361NdnPutFile::stopProcess()
362{
363 m_face.getIoService().stop();
364}
365
366void
367NdnPutFile::signData(ndn::Data& data)
368{
369 if (useDigestSha256) {
370 m_keyChain.signWithSha256(data);
371 }
372 else {
373 if (identityForData.empty())
374 m_keyChain.sign(data);
375 else {
376 ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
377 ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
378 m_keyChain.sign(data, certName);
379 }
380 }
381}
382
383void
384NdnPutFile::startCheckCommand()
385{
386 ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
387 RepoCommandParameter()
388 .setProcessId(m_processId));
389 m_face.expressInterest(checkInterest,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700390 bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
391 bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
Shuo Chenccfbe242014-04-29 23:57:51 +0800392}
393
394void
395NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
396{
397 RepoCommandResponse response(data.getContent().blockFromValue());
398 int statusCode = response.getStatusCode();
399 if (statusCode >= 400) {
400 throw Error("Insert check command failed with code: " +
401 boost::lexical_cast<std::string>(statusCode));
402 }
403
404 if (m_isFinished) {
405 uint64_t insertCount = response.getInsertNum();
406
407 if (isSingle) {
408 if (insertCount == 1) {
409 m_face.getIoService().stop();
410 return;
411 }
412 }
413 // Technically, the check should not infer, but directly has signal from repo that
414 // write operation has been finished
415
416 if (insertCount == m_currentSegmentNo) {
417 m_face.getIoService().stop();
418 return;
419 }
420 }
421
422 m_scheduler.scheduleEvent(m_checkPeriod,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700423 bind(&NdnPutFile::startCheckCommand, this));
Shuo Chenccfbe242014-04-29 23:57:51 +0800424}
425
426void
427NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
428{
429 throw Error("check response timeout");
430}
431
432ndn::Interest
433NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
434 const RepoCommandParameter& commandParameter)
435{
436 ndn::Interest interest(ndn::Name(commandPrefix)
437 .append(command)
438 .append(commandParameter.wireEncode()));
439 interest.setInterestLifetime(interestLifetime);
440
441 if (identityForCommand.empty())
442 m_generator.generate(interest);
443 else {
444 m_generator.generateWithIdentity(interest, ndn::Name(identityForCommand));
445 }
446
447 return interest;
448}
449
450static void
451usage()
452{
453 fprintf(stderr,
454 "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
455 " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
456 "\n"
457 " Write a file into a repo.\n"
458 " -u: unversioned: do not add a version component\n"
459 " -s: single: do not add version or segment component, implies -u\n"
460 " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
461 " -i: specify identity used for signing Data\n"
462 " -I: specify identity used for signing commands\n"
463 " -x: FreshnessPeriod in milliseconds\n"
464 " -l: InterestLifetime in milliseconds for each command\n"
465 " -w: timeout in milliseconds for whole process (default unlimited)\n"
466 " -v: be verbose\n"
467 " repo-prefix: repo command prefix\n"
468 " ndn-name: NDN Name prefix for written Data\n"
469 " filename: local file name; \"-\" reads from stdin\n"
470 );
471 exit(1);
472}
473
474int
475main(int argc, char** argv)
476{
477 NdnPutFile ndnPutFile;
478 int opt;
479 while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
480 switch (opt) {
481 case 'u':
482 ndnPutFile.isUnversioned = true;
483 break;
484 case 's':
485 ndnPutFile.isSingle = true;
486 break;
487 case 'D':
488 ndnPutFile.useDigestSha256 = true;
489 break;
490 case 'i':
491 ndnPutFile.identityForData = std::string(optarg);
492 break;
493 case 'I':
494 ndnPutFile.identityForCommand = std::string(optarg);
495 break;
496 case 'x':
497 try {
498 ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
499 }
500 catch (boost::bad_lexical_cast&) {
501 std::cerr << "-x option should be an integer.";
502 return 1;
503 }
504 break;
505 case 'l':
506 try {
507 ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
508 }
509 catch (boost::bad_lexical_cast&) {
510 std::cerr << "-l option should be an integer.";
511 return 1;
512 }
513 break;
514 case 'w':
515 ndnPutFile.hasTimeout = true;
516 try {
517 ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
518 }
519 catch (boost::bad_lexical_cast&) {
520 std::cerr << "-w option should be an integer.";
521 return 1;
522 }
523 break;
524 case 'v':
525 ndnPutFile.isVerbose = true;
526 break;
527 case 'h':
528 usage();
529 break;
530 default:
531 break;
532 }
533 }
534
535 argc -= optind;
536 argv += optind;
537
538 if (argc != 3)
539 usage();
540
541 ndnPutFile.repoPrefix = Name(argv[0]);
542 ndnPutFile.ndnName = Name(argv[1]);
543 if (strcmp(argv[2], "-") == 0) {
544
545 ndnPutFile.insertStream = &std::cin;
546 ndnPutFile.run();
547 }
548 else {
549 std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
550 if (!inputFileStream.is_open()) {
551 std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
552 return 1;
553 }
554
555 ndnPutFile.insertStream = &inputFileStream;
556 ndnPutFile.run();
557 }
558
559 // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
560
561 return 0;
562}
563
564} // namespace repo
565
566int
567main(int argc, char** argv)
568{
569 try {
570 return repo::main(argc, argv);
571 }
572 catch (std::exception& e) {
573 std::cerr << "ERROR: " << e.what() << std::endl;
574 return 2;
575 }
576}