blob: ffe5519abe53f1c402eb3d17cbad91ea095cdb94 [file] [log] [blame]
Shuo Chenccfbe242014-04-29 23:57:51 +08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. if (not, see <http://www.gnu.org/licenses/>.
18 */
19
Alexander Afanasyev39d98072014-05-04 12:46:29 -070020#include "../src/repo-command-parameter.hpp"
21#include "../src/repo-command-response.hpp"
Shuo Chenccfbe242014-04-29 23:57:51 +080022
23#include <ndn-cxx/face.hpp>
24#include <ndn-cxx/security/key-chain.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070025#include <ndn-cxx/util/scheduler.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080026#include <ndn-cxx/util/command-interest-generator.hpp>
27#include <fstream>
28#include <string>
29#include <stdlib.h>
Wentao Shang91fb4f22014-05-20 10:55:22 -070030#include <stdint.h>
Shuo Chenccfbe242014-04-29 23:57:51 +080031#include <boost/filesystem.hpp>
Wentao Shang91fb4f22014-05-20 10:55:22 -070032#include <boost/lexical_cast.hpp>
33#include <boost/asio.hpp>
Shuo Chenccfbe242014-04-29 23:57:51 +080034#include <boost/iostreams/operations.hpp>
35#include <boost/iostreams/read.hpp>
36
37namespace repo {
38
39using namespace ndn::time;
40
41static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
42static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
43static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
44static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
Weiqi Shi5822e342014-08-21 20:05:30 -070045static const size_t PRE_SIGN_DATA_COUNT = 11;
Shuo Chenccfbe242014-04-29 23:57:51 +080046
47class NdnPutFile : ndn::noncopyable
48{
49public:
50 class Error : public std::runtime_error
51 {
52 public:
53 explicit
54 Error(const std::string& what)
55 : std::runtime_error(what)
56 {
57 }
58 };
59
60 NdnPutFile()
61 : isUnversioned(false)
62 , isSingle(false)
63 , useDigestSha256(false)
64 , freshnessPeriod(DEFAULT_FRESHNESS_PERIOD)
65 , interestLifetime(DEFAULT_INTEREST_LIFETIME)
66 , hasTimeout(false)
67 , timeout(0)
68 , insertStream(0)
69 , isVerbose(false)
70
71 , m_scheduler(m_face.getIoService())
72 , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
73 , m_processId(0)
74 , m_checkPeriod(DEFAULT_CHECK_PERIOD)
75 , m_currentSegmentNo(0)
76 , m_isFinished(false)
77 {
78 }
79
80 void
81 run();
82
83private:
84 void
85 prepareNextData(uint64_t referenceSegmentNo);
86
87 void
88 startInsertCommand();
89
90 void
91 onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
92
93 void
94 onInsertCommandTimeout(const ndn::Interest& interest);
95
96 void
97 onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
98
99 void
100 onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
101
102 void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700103 onRegisterSuccess(const ndn::Name& prefix);
104
105 void
Shuo Chenccfbe242014-04-29 23:57:51 +0800106 onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
107
108 void
109 stopProcess();
110
111 void
112 signData(ndn::Data& data);
113
114 void
115 startCheckCommand();
116
117 void
118 onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
119
120 void
121 onCheckCommandTimeout(const ndn::Interest& interest);
122
123 ndn::Interest
124 generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
125 const RepoCommandParameter& commandParameter);
126
127public:
128 bool isUnversioned;
129 bool isSingle;
130 bool useDigestSha256;
131 std::string identityForData;
132 std::string identityForCommand;
133 milliseconds freshnessPeriod;
134 milliseconds interestLifetime;
135 bool hasTimeout;
136 milliseconds timeout;
137 ndn::Name repoPrefix;
138 ndn::Name ndnName;
139 std::istream* insertStream;
140 bool isVerbose;
141
142private:
143 ndn::Face m_face;
144 ndn::Scheduler m_scheduler;
145 ndn::KeyChain m_keyChain;
146 ndn::CommandInterestGenerator m_generator;
147 uint64_t m_timestampVersion;
148 uint64_t m_processId;
149 milliseconds m_checkPeriod;
150 size_t m_currentSegmentNo;
151 bool m_isFinished;
152 ndn::Name m_dataPrefix;
153
154 typedef std::map<uint64_t, ndn::shared_ptr<ndn::Data> > DataContainer;
155 DataContainer m_data;
156};
157
158void
159NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
160{
161 // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
162 if (m_isFinished)
163 return;
164
165 size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
166
167 if (!m_data.empty()) {
168 uint64_t maxSegmentNo = m_data.rbegin()->first;
169
170 if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
171 // nothing to prepare
172 return;
173 }
174
175 nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
176 }
177
178 for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
179 uint8_t buffer[DEFAULT_BLOCK_SIZE];
180
181 std::streamsize readSize =
182 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
183
184 if (readSize <= 0) {
185 throw Error("Error reading from the input stream");
186 }
187
188 ndn::shared_ptr<ndn::Data> data =
189 ndn::make_shared<ndn::Data>(Name(m_dataPrefix)
190 .appendSegment(m_currentSegmentNo));
191
192 if (insertStream->peek() == std::istream::traits_type::eof()) {
Weiqi Shi5822e342014-08-21 20:05:30 -0700193 data->setFinalBlockId(ndn::name::Component::fromSegment(m_currentSegmentNo));
Shuo Chenccfbe242014-04-29 23:57:51 +0800194 m_isFinished = true;
195 }
196
197 data->setContent(buffer, readSize);
198 data->setFreshnessPeriod(freshnessPeriod);
199 signData(*data);
200
201 m_data.insert(std::make_pair(m_currentSegmentNo, data));
202
203 ++m_currentSegmentNo;
204 }
205}
206
207void
208NdnPutFile::run()
209{
210 m_dataPrefix = ndnName;
211 if (!isUnversioned)
212 m_dataPrefix.appendVersion(m_timestampVersion);
213
214 if (isVerbose)
215 std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
216 m_face.setInterestFilter(m_dataPrefix,
217 isSingle ?
218 ndn::bind(&NdnPutFile::onSingleInterest, this, _1, _2)
219 :
220 ndn::bind(&NdnPutFile::onInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700221 ndn::bind(&NdnPutFile::onRegisterSuccess, this, _1),
Shuo Chenccfbe242014-04-29 23:57:51 +0800222 ndn::bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
223
Weiqi Shi5822e342014-08-21 20:05:30 -0700224
Shuo Chenccfbe242014-04-29 23:57:51 +0800225 if (hasTimeout)
226 m_scheduler.scheduleEvent(timeout, ndn::bind(&NdnPutFile::stopProcess, this));
227
228 m_face.processEvents();
229}
230
231void
Wentao Shang91fb4f22014-05-20 10:55:22 -0700232NdnPutFile::onRegisterSuccess(const Name& prefix)
233{
234 startInsertCommand();
235}
236
237void
Shuo Chenccfbe242014-04-29 23:57:51 +0800238NdnPutFile::startInsertCommand()
239{
240 RepoCommandParameter parameters;
241 parameters.setName(m_dataPrefix);
242 if (!isSingle) {
243 parameters.setStartBlockId(0);
244 }
245
246 ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
247 m_face.expressInterest(commandInterest,
248 ndn::bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
249 ndn::bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
250}
251
252void
253NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
254{
255 RepoCommandResponse response(data.getContent().blockFromValue());
256 int statusCode = response.getStatusCode();
257 if (statusCode >= 400) {
258 throw Error("insert command failed with code " +
259 boost::lexical_cast<std::string>(statusCode));
260 }
261 m_processId = response.getProcessId();
262
263 m_scheduler.scheduleEvent(m_checkPeriod,
264 ndn::bind(&NdnPutFile::startCheckCommand, this));
265}
266
267void
268NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
269{
270 throw Error("command response timeout");
271}
272
273void
274NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
275{
276 if (interest.getName().size() != prefix.size() + 1) {
277 if (isVerbose) {
278 std::cerr << "Error processing incoming interest " << interest << ": "
279 << "Unrecognized Interest" << std::endl;
280 }
281 return;
282 }
283
284 uint64_t segmentNo;
285 try {
286 ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
287 segmentNo = segmentComponent.toSegment();
288 }
289 catch (tlv::Error& e) {
290 if (isVerbose) {
291 std::cerr << "Error processing incoming interest " << interest << ": "
292 << e.what() << std::endl;
293 }
294 return;
295 }
296
297 prepareNextData(segmentNo);
298
299 DataContainer::iterator item = m_data.find(segmentNo);
300 if (item == m_data.end()) {
301 if (isVerbose) {
302 std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
303 }
304 return;
305 }
306
Weiqi Shi5822e342014-08-21 20:05:30 -0700307 if (m_isFinished) {
308 uint64_t final = m_currentSegmentNo - 1;
309 item->second->setFinalBlockId(ndn::name::Component::fromSegment(final));
310
311 }
Shuo Chenccfbe242014-04-29 23:57:51 +0800312 m_face.put(*item->second);
313}
314
315void
316NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
317{
318 BOOST_ASSERT(prefix == m_dataPrefix);
319
320 if (prefix != interest.getName()) {
321 if (isVerbose) {
322 std::cerr << "Received unexpected interest " << interest << std::endl;
323 }
324 return;
325 }
326
327 uint8_t buffer[DEFAULT_BLOCK_SIZE];
328 std::streamsize readSize =
329 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
330
331 if (readSize <= 0) {
332 throw Error("Error reading from the input stream");
333 }
334
335 if (insertStream->peek() != std::istream::traits_type::eof()) {
336 throw Error("Input data does not fit into one Data packet");
337 }
338
Weiqi Shi5822e342014-08-21 20:05:30 -0700339 ndn::shared_ptr<ndn::Data> data = ndn::make_shared<ndn::Data>(m_dataPrefix);
340 data->setContent(buffer, readSize);
341 data->setFreshnessPeriod(freshnessPeriod);
342 signData(*data);
343 m_face.put(*data);
Shuo Chenccfbe242014-04-29 23:57:51 +0800344
345 m_isFinished = true;
346}
347
348void
349NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
350{
351 throw Error("onRegisterFailed: " + reason);
352}
353
354void
355NdnPutFile::stopProcess()
356{
357 m_face.getIoService().stop();
358}
359
360void
361NdnPutFile::signData(ndn::Data& data)
362{
363 if (useDigestSha256) {
364 m_keyChain.signWithSha256(data);
365 }
366 else {
367 if (identityForData.empty())
368 m_keyChain.sign(data);
369 else {
370 ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
371 ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
372 m_keyChain.sign(data, certName);
373 }
374 }
375}
376
377void
378NdnPutFile::startCheckCommand()
379{
380 ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
381 RepoCommandParameter()
382 .setProcessId(m_processId));
383 m_face.expressInterest(checkInterest,
384 ndn::bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
385 ndn::bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
386}
387
388void
389NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
390{
391 RepoCommandResponse response(data.getContent().blockFromValue());
392 int statusCode = response.getStatusCode();
393 if (statusCode >= 400) {
394 throw Error("Insert check command failed with code: " +
395 boost::lexical_cast<std::string>(statusCode));
396 }
397
398 if (m_isFinished) {
399 uint64_t insertCount = response.getInsertNum();
400
401 if (isSingle) {
402 if (insertCount == 1) {
403 m_face.getIoService().stop();
404 return;
405 }
406 }
407 // Technically, the check should not infer, but directly has signal from repo that
408 // write operation has been finished
409
410 if (insertCount == m_currentSegmentNo) {
411 m_face.getIoService().stop();
412 return;
413 }
414 }
415
416 m_scheduler.scheduleEvent(m_checkPeriod,
417 ndn::bind(&NdnPutFile::startCheckCommand, this));
418}
419
420void
421NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
422{
423 throw Error("check response timeout");
424}
425
426ndn::Interest
427NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
428 const RepoCommandParameter& commandParameter)
429{
430 ndn::Interest interest(ndn::Name(commandPrefix)
431 .append(command)
432 .append(commandParameter.wireEncode()));
433 interest.setInterestLifetime(interestLifetime);
434
435 if (identityForCommand.empty())
436 m_generator.generate(interest);
437 else {
438 m_generator.generateWithIdentity(interest, ndn::Name(identityForCommand));
439 }
440
441 return interest;
442}
443
444static void
445usage()
446{
447 fprintf(stderr,
448 "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
449 " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
450 "\n"
451 " Write a file into a repo.\n"
452 " -u: unversioned: do not add a version component\n"
453 " -s: single: do not add version or segment component, implies -u\n"
454 " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
455 " -i: specify identity used for signing Data\n"
456 " -I: specify identity used for signing commands\n"
457 " -x: FreshnessPeriod in milliseconds\n"
458 " -l: InterestLifetime in milliseconds for each command\n"
459 " -w: timeout in milliseconds for whole process (default unlimited)\n"
460 " -v: be verbose\n"
461 " repo-prefix: repo command prefix\n"
462 " ndn-name: NDN Name prefix for written Data\n"
463 " filename: local file name; \"-\" reads from stdin\n"
464 );
465 exit(1);
466}
467
468int
469main(int argc, char** argv)
470{
471 NdnPutFile ndnPutFile;
472 int opt;
473 while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
474 switch (opt) {
475 case 'u':
476 ndnPutFile.isUnversioned = true;
477 break;
478 case 's':
479 ndnPutFile.isSingle = true;
480 break;
481 case 'D':
482 ndnPutFile.useDigestSha256 = true;
483 break;
484 case 'i':
485 ndnPutFile.identityForData = std::string(optarg);
486 break;
487 case 'I':
488 ndnPutFile.identityForCommand = std::string(optarg);
489 break;
490 case 'x':
491 try {
492 ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
493 }
494 catch (boost::bad_lexical_cast&) {
495 std::cerr << "-x option should be an integer.";
496 return 1;
497 }
498 break;
499 case 'l':
500 try {
501 ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
502 }
503 catch (boost::bad_lexical_cast&) {
504 std::cerr << "-l option should be an integer.";
505 return 1;
506 }
507 break;
508 case 'w':
509 ndnPutFile.hasTimeout = true;
510 try {
511 ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
512 }
513 catch (boost::bad_lexical_cast&) {
514 std::cerr << "-w option should be an integer.";
515 return 1;
516 }
517 break;
518 case 'v':
519 ndnPutFile.isVerbose = true;
520 break;
521 case 'h':
522 usage();
523 break;
524 default:
525 break;
526 }
527 }
528
529 argc -= optind;
530 argv += optind;
531
532 if (argc != 3)
533 usage();
534
535 ndnPutFile.repoPrefix = Name(argv[0]);
536 ndnPutFile.ndnName = Name(argv[1]);
537 if (strcmp(argv[2], "-") == 0) {
538
539 ndnPutFile.insertStream = &std::cin;
540 ndnPutFile.run();
541 }
542 else {
543 std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
544 if (!inputFileStream.is_open()) {
545 std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
546 return 1;
547 }
548
549 ndnPutFile.insertStream = &inputFileStream;
550 ndnPutFile.run();
551 }
552
553 // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
554
555 return 0;
556}
557
558} // namespace repo
559
560int
561main(int argc, char** argv)
562{
563 try {
564 return repo::main(argc, argv);
565 }
566 catch (std::exception& e) {
567 std::cerr << "ERROR: " << e.what() << std::endl;
568 return 2;
569 }
570}