blob: 2103eb1ef5646fcfe0fd02bd945fabd202bc6e59 [file] [log] [blame]
Shuo Chenccfbe242014-04-29 23:57:51 +08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. if (not, see <http://www.gnu.org/licenses/>.
18 */
19
Alexander Afanasyev39d98072014-05-04 12:46:29 -070020#include "../src/repo-command-parameter.hpp"
21#include "../src/repo-command-response.hpp"
Shuo Chenccfbe242014-04-29 23:57:51 +080022
23#include <ndn-cxx/face.hpp>
24#include <ndn-cxx/security/key-chain.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070025#include <ndn-cxx/util/scheduler.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080026#include <ndn-cxx/util/command-interest-generator.hpp>
27#include <fstream>
28#include <string>
29#include <stdlib.h>
Wentao Shang91fb4f22014-05-20 10:55:22 -070030#include <stdint.h>
Shuo Chenccfbe242014-04-29 23:57:51 +080031#include <boost/filesystem.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070032#include <boost/lexical_cast.hpp>
33#include <boost/asio.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080034#include <boost/iostreams/operations.hpp>
35#include <boost/iostreams/read.hpp>
36
37namespace repo {
38
39using namespace ndn::time;
40
41static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
42static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
43static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
44static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
45static const size_t PRE_SIGN_DATA_COUNT = 10;
46
47class NdnPutFile : ndn::noncopyable
48{
49public:
50 class Error : public std::runtime_error
51 {
52 public:
53 explicit
54 Error(const std::string& what)
55 : std::runtime_error(what)
56 {
57 }
58 };
59
60 NdnPutFile()
61 : isUnversioned(false)
62 , isSingle(false)
63 , useDigestSha256(false)
64 , freshnessPeriod(DEFAULT_FRESHNESS_PERIOD)
65 , interestLifetime(DEFAULT_INTEREST_LIFETIME)
66 , hasTimeout(false)
67 , timeout(0)
68 , insertStream(0)
69 , isVerbose(false)
70
71 , m_scheduler(m_face.getIoService())
72 , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
73 , m_processId(0)
74 , m_checkPeriod(DEFAULT_CHECK_PERIOD)
75 , m_currentSegmentNo(0)
76 , m_isFinished(false)
77 {
78 }
79
80 void
81 run();
82
83private:
84 void
85 prepareNextData(uint64_t referenceSegmentNo);
86
87 void
88 startInsertCommand();
89
90 void
91 onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
92
93 void
94 onInsertCommandTimeout(const ndn::Interest& interest);
95
96 void
97 onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
98
99 void
100 onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
101
102 void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700103 onRegisterSuccess(const ndn::Name& prefix);
104
105 void
Shuo Chenccfbe242014-04-29 23:57:51 +0800106 onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
107
108 void
109 stopProcess();
110
111 void
112 signData(ndn::Data& data);
113
114 void
115 startCheckCommand();
116
117 void
118 onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
119
120 void
121 onCheckCommandTimeout(const ndn::Interest& interest);
122
123 ndn::Interest
124 generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
125 const RepoCommandParameter& commandParameter);
126
127public:
128 bool isUnversioned;
129 bool isSingle;
130 bool useDigestSha256;
131 std::string identityForData;
132 std::string identityForCommand;
133 milliseconds freshnessPeriod;
134 milliseconds interestLifetime;
135 bool hasTimeout;
136 milliseconds timeout;
137 ndn::Name repoPrefix;
138 ndn::Name ndnName;
139 std::istream* insertStream;
140 bool isVerbose;
141
142private:
143 ndn::Face m_face;
144 ndn::Scheduler m_scheduler;
145 ndn::KeyChain m_keyChain;
146 ndn::CommandInterestGenerator m_generator;
147 uint64_t m_timestampVersion;
148 uint64_t m_processId;
149 milliseconds m_checkPeriod;
150 size_t m_currentSegmentNo;
151 bool m_isFinished;
152 ndn::Name m_dataPrefix;
153
154 typedef std::map<uint64_t, ndn::shared_ptr<ndn::Data> > DataContainer;
155 DataContainer m_data;
156};
157
158void
159NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
160{
161 // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
162 if (m_isFinished)
163 return;
164
165 size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
166
167 if (!m_data.empty()) {
168 uint64_t maxSegmentNo = m_data.rbegin()->first;
169
170 if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
171 // nothing to prepare
172 return;
173 }
174
175 nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
176 }
177
178 for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
179 uint8_t buffer[DEFAULT_BLOCK_SIZE];
180
181 std::streamsize readSize =
182 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
183
184 if (readSize <= 0) {
185 throw Error("Error reading from the input stream");
186 }
187
188 ndn::shared_ptr<ndn::Data> data =
189 ndn::make_shared<ndn::Data>(Name(m_dataPrefix)
190 .appendSegment(m_currentSegmentNo));
191
192 if (insertStream->peek() == std::istream::traits_type::eof()) {
193 data->setFinalBlockId(ndn::name::Component::fromNumber(m_currentSegmentNo));
194 m_isFinished = true;
195 }
196
197 data->setContent(buffer, readSize);
198 data->setFreshnessPeriod(freshnessPeriod);
199 signData(*data);
200
201 m_data.insert(std::make_pair(m_currentSegmentNo, data));
202
203 ++m_currentSegmentNo;
204 }
205}
206
207void
208NdnPutFile::run()
209{
210 m_dataPrefix = ndnName;
211 if (!isUnversioned)
212 m_dataPrefix.appendVersion(m_timestampVersion);
213
214 if (isVerbose)
215 std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
216 m_face.setInterestFilter(m_dataPrefix,
217 isSingle ?
218 ndn::bind(&NdnPutFile::onSingleInterest, this, _1, _2)
219 :
220 ndn::bind(&NdnPutFile::onInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700221 ndn::bind(&NdnPutFile::onRegisterSuccess, this, _1),
Shuo Chenccfbe242014-04-29 23:57:51 +0800222 ndn::bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
223
Shuo Chenccfbe242014-04-29 23:57:51 +0800224 if (hasTimeout)
225 m_scheduler.scheduleEvent(timeout, ndn::bind(&NdnPutFile::stopProcess, this));
226
227 m_face.processEvents();
228}
229
230void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700231NdnPutFile::onRegisterSuccess(const Name& prefix)
232{
233 startInsertCommand();
234}
235
236void
Shuo Chenccfbe242014-04-29 23:57:51 +0800237NdnPutFile::startInsertCommand()
238{
239 RepoCommandParameter parameters;
240 parameters.setName(m_dataPrefix);
241 if (!isSingle) {
242 parameters.setStartBlockId(0);
243 }
244
245 ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
246 m_face.expressInterest(commandInterest,
247 ndn::bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
248 ndn::bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
249}
250
251void
252NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
253{
254 RepoCommandResponse response(data.getContent().blockFromValue());
255 int statusCode = response.getStatusCode();
256 if (statusCode >= 400) {
257 throw Error("insert command failed with code " +
258 boost::lexical_cast<std::string>(statusCode));
259 }
260 m_processId = response.getProcessId();
261
262 m_scheduler.scheduleEvent(m_checkPeriod,
263 ndn::bind(&NdnPutFile::startCheckCommand, this));
264}
265
266void
267NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
268{
269 throw Error("command response timeout");
270}
271
272void
273NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
274{
275 if (interest.getName().size() != prefix.size() + 1) {
276 if (isVerbose) {
277 std::cerr << "Error processing incoming interest " << interest << ": "
278 << "Unrecognized Interest" << std::endl;
279 }
280 return;
281 }
282
283 uint64_t segmentNo;
284 try {
285 ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
286 segmentNo = segmentComponent.toSegment();
287 }
288 catch (tlv::Error& e) {
289 if (isVerbose) {
290 std::cerr << "Error processing incoming interest " << interest << ": "
291 << e.what() << std::endl;
292 }
293 return;
294 }
295
296 prepareNextData(segmentNo);
297
298 DataContainer::iterator item = m_data.find(segmentNo);
299 if (item == m_data.end()) {
300 if (isVerbose) {
301 std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
302 }
303 return;
304 }
305
306 m_face.put(*item->second);
307}
308
309void
310NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
311{
312 BOOST_ASSERT(prefix == m_dataPrefix);
313
314 if (prefix != interest.getName()) {
315 if (isVerbose) {
316 std::cerr << "Received unexpected interest " << interest << std::endl;
317 }
318 return;
319 }
320
321 uint8_t buffer[DEFAULT_BLOCK_SIZE];
322 std::streamsize readSize =
323 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
324
325 if (readSize <= 0) {
326 throw Error("Error reading from the input stream");
327 }
328
329 if (insertStream->peek() != std::istream::traits_type::eof()) {
330 throw Error("Input data does not fit into one Data packet");
331 }
332
333 ndn::Data data(m_dataPrefix);
334 data.setContent(buffer, readSize);
335 data.setFreshnessPeriod(freshnessPeriod);
336 signData(data);
337 m_face.put(data);
338
339 m_isFinished = true;
340}
341
342void
343NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
344{
345 throw Error("onRegisterFailed: " + reason);
346}
347
348void
349NdnPutFile::stopProcess()
350{
351 m_face.getIoService().stop();
352}
353
354void
355NdnPutFile::signData(ndn::Data& data)
356{
357 if (useDigestSha256) {
358 m_keyChain.signWithSha256(data);
359 }
360 else {
361 if (identityForData.empty())
362 m_keyChain.sign(data);
363 else {
364 ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
365 ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
366 m_keyChain.sign(data, certName);
367 }
368 }
369}
370
371void
372NdnPutFile::startCheckCommand()
373{
374 ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
375 RepoCommandParameter()
376 .setProcessId(m_processId));
377 m_face.expressInterest(checkInterest,
378 ndn::bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
379 ndn::bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
380}
381
382void
383NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
384{
385 RepoCommandResponse response(data.getContent().blockFromValue());
386 int statusCode = response.getStatusCode();
387 if (statusCode >= 400) {
388 throw Error("Insert check command failed with code: " +
389 boost::lexical_cast<std::string>(statusCode));
390 }
391
392 if (m_isFinished) {
393 uint64_t insertCount = response.getInsertNum();
394
395 if (isSingle) {
396 if (insertCount == 1) {
397 m_face.getIoService().stop();
398 return;
399 }
400 }
401 // Technically, the check should not infer, but directly has signal from repo that
402 // write operation has been finished
403
404 if (insertCount == m_currentSegmentNo) {
405 m_face.getIoService().stop();
406 return;
407 }
408 }
409
410 m_scheduler.scheduleEvent(m_checkPeriod,
411 ndn::bind(&NdnPutFile::startCheckCommand, this));
412}
413
414void
415NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
416{
417 throw Error("check response timeout");
418}
419
420ndn::Interest
421NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
422 const RepoCommandParameter& commandParameter)
423{
424 ndn::Interest interest(ndn::Name(commandPrefix)
425 .append(command)
426 .append(commandParameter.wireEncode()));
427 interest.setInterestLifetime(interestLifetime);
428
429 if (identityForCommand.empty())
430 m_generator.generate(interest);
431 else {
432 m_generator.generateWithIdentity(interest, ndn::Name(identityForCommand));
433 }
434
435 return interest;
436}
437
438static void
439usage()
440{
441 fprintf(stderr,
442 "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
443 " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
444 "\n"
445 " Write a file into a repo.\n"
446 " -u: unversioned: do not add a version component\n"
447 " -s: single: do not add version or segment component, implies -u\n"
448 " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
449 " -i: specify identity used for signing Data\n"
450 " -I: specify identity used for signing commands\n"
451 " -x: FreshnessPeriod in milliseconds\n"
452 " -l: InterestLifetime in milliseconds for each command\n"
453 " -w: timeout in milliseconds for whole process (default unlimited)\n"
454 " -v: be verbose\n"
455 " repo-prefix: repo command prefix\n"
456 " ndn-name: NDN Name prefix for written Data\n"
457 " filename: local file name; \"-\" reads from stdin\n"
458 );
459 exit(1);
460}
461
462int
463main(int argc, char** argv)
464{
465 NdnPutFile ndnPutFile;
466 int opt;
467 while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
468 switch (opt) {
469 case 'u':
470 ndnPutFile.isUnversioned = true;
471 break;
472 case 's':
473 ndnPutFile.isSingle = true;
474 break;
475 case 'D':
476 ndnPutFile.useDigestSha256 = true;
477 break;
478 case 'i':
479 ndnPutFile.identityForData = std::string(optarg);
480 break;
481 case 'I':
482 ndnPutFile.identityForCommand = std::string(optarg);
483 break;
484 case 'x':
485 try {
486 ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
487 }
488 catch (boost::bad_lexical_cast&) {
489 std::cerr << "-x option should be an integer.";
490 return 1;
491 }
492 break;
493 case 'l':
494 try {
495 ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
496 }
497 catch (boost::bad_lexical_cast&) {
498 std::cerr << "-l option should be an integer.";
499 return 1;
500 }
501 break;
502 case 'w':
503 ndnPutFile.hasTimeout = true;
504 try {
505 ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
506 }
507 catch (boost::bad_lexical_cast&) {
508 std::cerr << "-w option should be an integer.";
509 return 1;
510 }
511 break;
512 case 'v':
513 ndnPutFile.isVerbose = true;
514 break;
515 case 'h':
516 usage();
517 break;
518 default:
519 break;
520 }
521 }
522
523 argc -= optind;
524 argv += optind;
525
526 if (argc != 3)
527 usage();
528
529 ndnPutFile.repoPrefix = Name(argv[0]);
530 ndnPutFile.ndnName = Name(argv[1]);
531 if (strcmp(argv[2], "-") == 0) {
532
533 ndnPutFile.insertStream = &std::cin;
534 ndnPutFile.run();
535 }
536 else {
537 std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
538 if (!inputFileStream.is_open()) {
539 std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
540 return 1;
541 }
542
543 ndnPutFile.insertStream = &inputFileStream;
544 ndnPutFile.run();
545 }
546
547 // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
548
549 return 0;
550}
551
552} // namespace repo
553
554int
555main(int argc, char** argv)
556{
557 try {
558 return repo::main(argc, argv);
559 }
560 catch (std::exception& e) {
561 std::cerr << "ERROR: " << e.what() << std::endl;
562 return 2;
563 }
564}