blob: 1c5693400b29505cec4b0ec44b28aab61ad81658 [file] [log] [blame]
Shuo Chenccfbe242014-04-29 23:57:51 +08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. if (not, see <http://www.gnu.org/licenses/>.
18 */
19
Alexander Afanasyev39d98072014-05-04 12:46:29 -070020#include "../src/repo-command-parameter.hpp"
21#include "../src/repo-command-response.hpp"
Shuo Chenccfbe242014-04-29 23:57:51 +080022
23#include <ndn-cxx/face.hpp>
24#include <ndn-cxx/security/key-chain.hpp>
25#include <ndn-cxx/util/command-interest-generator.hpp>
26#include <fstream>
27#include <string>
28#include <stdlib.h>
29#include <boost/filesystem.hpp>
30
31#include <boost/iostreams/operations.hpp>
32#include <boost/iostreams/read.hpp>
33
34namespace repo {
35
36using namespace ndn::time;
37
38static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
39static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
40static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
41static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
42static const size_t PRE_SIGN_DATA_COUNT = 10;
43
44class NdnPutFile : ndn::noncopyable
45{
46public:
47 class Error : public std::runtime_error
48 {
49 public:
50 explicit
51 Error(const std::string& what)
52 : std::runtime_error(what)
53 {
54 }
55 };
56
57 NdnPutFile()
58 : isUnversioned(false)
59 , isSingle(false)
60 , useDigestSha256(false)
61 , freshnessPeriod(DEFAULT_FRESHNESS_PERIOD)
62 , interestLifetime(DEFAULT_INTEREST_LIFETIME)
63 , hasTimeout(false)
64 , timeout(0)
65 , insertStream(0)
66 , isVerbose(false)
67
68 , m_scheduler(m_face.getIoService())
69 , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
70 , m_processId(0)
71 , m_checkPeriod(DEFAULT_CHECK_PERIOD)
72 , m_currentSegmentNo(0)
73 , m_isFinished(false)
74 {
75 }
76
77 void
78 run();
79
80private:
81 void
82 prepareNextData(uint64_t referenceSegmentNo);
83
84 void
85 startInsertCommand();
86
87 void
88 onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
89
90 void
91 onInsertCommandTimeout(const ndn::Interest& interest);
92
93 void
94 onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
95
96 void
97 onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
98
99 void
100 onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
101
102 void
103 stopProcess();
104
105 void
106 signData(ndn::Data& data);
107
108 void
109 startCheckCommand();
110
111 void
112 onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
113
114 void
115 onCheckCommandTimeout(const ndn::Interest& interest);
116
117 ndn::Interest
118 generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
119 const RepoCommandParameter& commandParameter);
120
121public:
122 bool isUnversioned;
123 bool isSingle;
124 bool useDigestSha256;
125 std::string identityForData;
126 std::string identityForCommand;
127 milliseconds freshnessPeriod;
128 milliseconds interestLifetime;
129 bool hasTimeout;
130 milliseconds timeout;
131 ndn::Name repoPrefix;
132 ndn::Name ndnName;
133 std::istream* insertStream;
134 bool isVerbose;
135
136private:
137 ndn::Face m_face;
138 ndn::Scheduler m_scheduler;
139 ndn::KeyChain m_keyChain;
140 ndn::CommandInterestGenerator m_generator;
141 uint64_t m_timestampVersion;
142 uint64_t m_processId;
143 milliseconds m_checkPeriod;
144 size_t m_currentSegmentNo;
145 bool m_isFinished;
146 ndn::Name m_dataPrefix;
147
148 typedef std::map<uint64_t, ndn::shared_ptr<ndn::Data> > DataContainer;
149 DataContainer m_data;
150};
151
152void
153NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
154{
155 // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
156 if (m_isFinished)
157 return;
158
159 size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
160
161 if (!m_data.empty()) {
162 uint64_t maxSegmentNo = m_data.rbegin()->first;
163
164 if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
165 // nothing to prepare
166 return;
167 }
168
169 nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
170 }
171
172 for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
173 uint8_t buffer[DEFAULT_BLOCK_SIZE];
174
175 std::streamsize readSize =
176 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
177
178 if (readSize <= 0) {
179 throw Error("Error reading from the input stream");
180 }
181
182 ndn::shared_ptr<ndn::Data> data =
183 ndn::make_shared<ndn::Data>(Name(m_dataPrefix)
184 .appendSegment(m_currentSegmentNo));
185
186 if (insertStream->peek() == std::istream::traits_type::eof()) {
187 data->setFinalBlockId(ndn::name::Component::fromNumber(m_currentSegmentNo));
188 m_isFinished = true;
189 }
190
191 data->setContent(buffer, readSize);
192 data->setFreshnessPeriod(freshnessPeriod);
193 signData(*data);
194
195 m_data.insert(std::make_pair(m_currentSegmentNo, data));
196
197 ++m_currentSegmentNo;
198 }
199}
200
201void
202NdnPutFile::run()
203{
204 m_dataPrefix = ndnName;
205 if (!isUnversioned)
206 m_dataPrefix.appendVersion(m_timestampVersion);
207
208 if (isVerbose)
209 std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
210 m_face.setInterestFilter(m_dataPrefix,
211 isSingle ?
212 ndn::bind(&NdnPutFile::onSingleInterest, this, _1, _2)
213 :
214 ndn::bind(&NdnPutFile::onInterest, this, _1, _2),
215 ndn::bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
216
217 // @todo Move startCommand
218 // setInterestFilter doesn't currently have "onSuccess" callback,
219 // so insertCommand needs to be started right away
220 startInsertCommand();
221
222 if (hasTimeout)
223 m_scheduler.scheduleEvent(timeout, ndn::bind(&NdnPutFile::stopProcess, this));
224
225 m_face.processEvents();
226}
227
228void
229NdnPutFile::startInsertCommand()
230{
231 RepoCommandParameter parameters;
232 parameters.setName(m_dataPrefix);
233 if (!isSingle) {
234 parameters.setStartBlockId(0);
235 }
236
237 ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
238 m_face.expressInterest(commandInterest,
239 ndn::bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
240 ndn::bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
241}
242
243void
244NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
245{
246 RepoCommandResponse response(data.getContent().blockFromValue());
247 int statusCode = response.getStatusCode();
248 if (statusCode >= 400) {
249 throw Error("insert command failed with code " +
250 boost::lexical_cast<std::string>(statusCode));
251 }
252 m_processId = response.getProcessId();
253
254 m_scheduler.scheduleEvent(m_checkPeriod,
255 ndn::bind(&NdnPutFile::startCheckCommand, this));
256}
257
258void
259NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
260{
261 throw Error("command response timeout");
262}
263
264void
265NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
266{
267 if (interest.getName().size() != prefix.size() + 1) {
268 if (isVerbose) {
269 std::cerr << "Error processing incoming interest " << interest << ": "
270 << "Unrecognized Interest" << std::endl;
271 }
272 return;
273 }
274
275 uint64_t segmentNo;
276 try {
277 ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
278 segmentNo = segmentComponent.toSegment();
279 }
280 catch (tlv::Error& e) {
281 if (isVerbose) {
282 std::cerr << "Error processing incoming interest " << interest << ": "
283 << e.what() << std::endl;
284 }
285 return;
286 }
287
288 prepareNextData(segmentNo);
289
290 DataContainer::iterator item = m_data.find(segmentNo);
291 if (item == m_data.end()) {
292 if (isVerbose) {
293 std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
294 }
295 return;
296 }
297
298 m_face.put(*item->second);
299}
300
301void
302NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
303{
304 BOOST_ASSERT(prefix == m_dataPrefix);
305
306 if (prefix != interest.getName()) {
307 if (isVerbose) {
308 std::cerr << "Received unexpected interest " << interest << std::endl;
309 }
310 return;
311 }
312
313 uint8_t buffer[DEFAULT_BLOCK_SIZE];
314 std::streamsize readSize =
315 boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
316
317 if (readSize <= 0) {
318 throw Error("Error reading from the input stream");
319 }
320
321 if (insertStream->peek() != std::istream::traits_type::eof()) {
322 throw Error("Input data does not fit into one Data packet");
323 }
324
325 ndn::Data data(m_dataPrefix);
326 data.setContent(buffer, readSize);
327 data.setFreshnessPeriod(freshnessPeriod);
328 signData(data);
329 m_face.put(data);
330
331 m_isFinished = true;
332}
333
334void
335NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
336{
337 throw Error("onRegisterFailed: " + reason);
338}
339
340void
341NdnPutFile::stopProcess()
342{
343 m_face.getIoService().stop();
344}
345
346void
347NdnPutFile::signData(ndn::Data& data)
348{
349 if (useDigestSha256) {
350 m_keyChain.signWithSha256(data);
351 }
352 else {
353 if (identityForData.empty())
354 m_keyChain.sign(data);
355 else {
356 ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
357 ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
358 m_keyChain.sign(data, certName);
359 }
360 }
361}
362
363void
364NdnPutFile::startCheckCommand()
365{
366 ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
367 RepoCommandParameter()
368 .setProcessId(m_processId));
369 m_face.expressInterest(checkInterest,
370 ndn::bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
371 ndn::bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
372}
373
374void
375NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
376{
377 RepoCommandResponse response(data.getContent().blockFromValue());
378 int statusCode = response.getStatusCode();
379 if (statusCode >= 400) {
380 throw Error("Insert check command failed with code: " +
381 boost::lexical_cast<std::string>(statusCode));
382 }
383
384 if (m_isFinished) {
385 uint64_t insertCount = response.getInsertNum();
386
387 if (isSingle) {
388 if (insertCount == 1) {
389 m_face.getIoService().stop();
390 return;
391 }
392 }
393 // Technically, the check should not infer, but directly has signal from repo that
394 // write operation has been finished
395
396 if (insertCount == m_currentSegmentNo) {
397 m_face.getIoService().stop();
398 return;
399 }
400 }
401
402 m_scheduler.scheduleEvent(m_checkPeriod,
403 ndn::bind(&NdnPutFile::startCheckCommand, this));
404}
405
406void
407NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
408{
409 throw Error("check response timeout");
410}
411
412ndn::Interest
413NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
414 const RepoCommandParameter& commandParameter)
415{
416 ndn::Interest interest(ndn::Name(commandPrefix)
417 .append(command)
418 .append(commandParameter.wireEncode()));
419 interest.setInterestLifetime(interestLifetime);
420
421 if (identityForCommand.empty())
422 m_generator.generate(interest);
423 else {
424 m_generator.generateWithIdentity(interest, ndn::Name(identityForCommand));
425 }
426
427 return interest;
428}
429
430static void
431usage()
432{
433 fprintf(stderr,
434 "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
435 " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
436 "\n"
437 " Write a file into a repo.\n"
438 " -u: unversioned: do not add a version component\n"
439 " -s: single: do not add version or segment component, implies -u\n"
440 " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
441 " -i: specify identity used for signing Data\n"
442 " -I: specify identity used for signing commands\n"
443 " -x: FreshnessPeriod in milliseconds\n"
444 " -l: InterestLifetime in milliseconds for each command\n"
445 " -w: timeout in milliseconds for whole process (default unlimited)\n"
446 " -v: be verbose\n"
447 " repo-prefix: repo command prefix\n"
448 " ndn-name: NDN Name prefix for written Data\n"
449 " filename: local file name; \"-\" reads from stdin\n"
450 );
451 exit(1);
452}
453
454int
455main(int argc, char** argv)
456{
457 NdnPutFile ndnPutFile;
458 int opt;
459 while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
460 switch (opt) {
461 case 'u':
462 ndnPutFile.isUnversioned = true;
463 break;
464 case 's':
465 ndnPutFile.isSingle = true;
466 break;
467 case 'D':
468 ndnPutFile.useDigestSha256 = true;
469 break;
470 case 'i':
471 ndnPutFile.identityForData = std::string(optarg);
472 break;
473 case 'I':
474 ndnPutFile.identityForCommand = std::string(optarg);
475 break;
476 case 'x':
477 try {
478 ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
479 }
480 catch (boost::bad_lexical_cast&) {
481 std::cerr << "-x option should be an integer.";
482 return 1;
483 }
484 break;
485 case 'l':
486 try {
487 ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
488 }
489 catch (boost::bad_lexical_cast&) {
490 std::cerr << "-l option should be an integer.";
491 return 1;
492 }
493 break;
494 case 'w':
495 ndnPutFile.hasTimeout = true;
496 try {
497 ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
498 }
499 catch (boost::bad_lexical_cast&) {
500 std::cerr << "-w option should be an integer.";
501 return 1;
502 }
503 break;
504 case 'v':
505 ndnPutFile.isVerbose = true;
506 break;
507 case 'h':
508 usage();
509 break;
510 default:
511 break;
512 }
513 }
514
515 argc -= optind;
516 argv += optind;
517
518 if (argc != 3)
519 usage();
520
521 ndnPutFile.repoPrefix = Name(argv[0]);
522 ndnPutFile.ndnName = Name(argv[1]);
523 if (strcmp(argv[2], "-") == 0) {
524
525 ndnPutFile.insertStream = &std::cin;
526 ndnPutFile.run();
527 }
528 else {
529 std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
530 if (!inputFileStream.is_open()) {
531 std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
532 return 1;
533 }
534
535 ndnPutFile.insertStream = &inputFileStream;
536 ndnPutFile.run();
537 }
538
539 // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
540
541 return 0;
542}
543
544} // namespace repo
545
546int
547main(int argc, char** argv)
548{
549 try {
550 return repo::main(argc, argv);
551 }
552 catch (std::exception& e) {
553 std::cerr << "ERROR: " << e.what() << std::endl;
554 return 2;
555 }
556}