tools: ndnputfile
+#! /usr/bin/env python
+# encoding: utf-8
+from waflib import Options
+from waflib.Configure import conf
+def options(opt):
+ opt.add_option('--with-sqlite3', type='string', default=None,
+ dest='with_sqlite3', help='''Path to SQLite3, e.g., /usr/local''')
+ opt.add_option('--without-sqlite-locking', action='store_false', default=True,
+ dest='with_sqlite_locking',
+ help='''Disable filesystem locking in sqlite3 database (use unix-dot '''
+ '''locking mechanism instead). This option may be necessary if home '''
+ '''directory is hosted on NFS.''')
+def check_sqlite3(self, *k, **kw):
+ root = k and k[0] or kw.get('path', None) or Options.options.with_sqlite3
+ mandatory = kw.get('mandatory', True)
+ var = kw.get('uselib_store', 'SQLITE3')
+ if root:
+ self.check_cxx(lib='sqlite3',
+ msg='Checking for SQLite3 library',
+ define_name='HAVE_%s' % var,
+ uselib_store=var,
+ mandatory=mandatory,
+ cxxflags="-I%s/include" % root,
+ linkflags="-L%s/lib" % root)
+ else:
+ try:
+ self.check_cfg(package='sqlite3',
+ args=['--cflags', '--libs'],
+ uselib_store='SQLITE3',
+ mandatory=True)
+ except:
+ self.check_cxx(lib='sqlite3',
+ msg='Checking for SQLite3 library',
+ define_name='HAVE_%s' % var,
+ uselib_store=var,
+ mandatory=mandatory)
//check whether sendingSegment exceeds
- if (sendingSegment > response.getEndBlockId()) {
+ if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
//do not do anything
@@ -327,7 +327,7 @@
retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
//increase the next seg and put it into the queue
- if ((nextSegment + 1) <= response.getEndBlockId()) {
+ if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in file. if (not, see <>.
+ */
+#include "helpers/repo-command-parameter.hpp"
+#include "helpers/repo-command-response.hpp"
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/command-interest-generator.hpp>
+#include <fstream>
+#include <string>
+#include <stdlib.h>
+#include <boost/filesystem.hpp>
+#include <boost/iostreams/operations.hpp>
+#include <boost/iostreams/read.hpp>
+namespace repo {
+using namespace ndn::time;
+static const uint64_t DEFAULT_BLOCK_SIZE = 1000;
+static const uint64_t DEFAULT_INTEREST_LIFETIME = 4000;
+static const uint64_t DEFAULT_FRESHNESS_PERIOD = 10000;
+static const uint64_t DEFAULT_CHECK_PERIOD = 1000;
+static const size_t PRE_SIGN_DATA_COUNT = 10;
+class NdnPutFile : ndn::noncopyable
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+ NdnPutFile()
+ : isUnversioned(false)
+ , isSingle(false)
+ , useDigestSha256(false)
+ , hasTimeout(false)
+ , timeout(0)
+ , insertStream(0)
+ , isVerbose(false)
+ , m_scheduler(m_face.getIoService())
+ , m_timestampVersion(toUnixTimestamp(system_clock::now()).count())
+ , m_processId(0)
+ , m_checkPeriod(DEFAULT_CHECK_PERIOD)
+ , m_currentSegmentNo(0)
+ , m_isFinished(false)
+ {
+ }
+ void
+ run();
+ void
+ prepareNextData(uint64_t referenceSegmentNo);
+ void
+ startInsertCommand();
+ void
+ onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data);
+ void
+ onInsertCommandTimeout(const ndn::Interest& interest);
+ void
+ onInterest(const ndn::Name& prefix, const ndn::Interest& interest);
+ void
+ onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest);
+ void
+ onRegisterFailed(const ndn::Name& prefix, const std::string& reason);
+ void
+ stopProcess();
+ void
+ signData(ndn::Data& data);
+ void
+ startCheckCommand();
+ void
+ onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data);
+ void
+ onCheckCommandTimeout(const ndn::Interest& interest);
+ ndn::Interest
+ generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
+ const RepoCommandParameter& commandParameter);
+ bool isUnversioned;
+ bool isSingle;
+ bool useDigestSha256;
+ std::string identityForData;
+ std::string identityForCommand;
+ milliseconds freshnessPeriod;
+ milliseconds interestLifetime;
+ bool hasTimeout;
+ milliseconds timeout;
+ ndn::Name repoPrefix;
+ ndn::Name ndnName;
+ std::istream* insertStream;
+ bool isVerbose;
+ ndn::Face m_face;
+ ndn::Scheduler m_scheduler;
+ ndn::KeyChain m_keyChain;
+ ndn::CommandInterestGenerator m_generator;
+ uint64_t m_timestampVersion;
+ uint64_t m_processId;
+ milliseconds m_checkPeriod;
+ size_t m_currentSegmentNo;
+ bool m_isFinished;
+ ndn::Name m_dataPrefix;
+ typedef std::map<uint64_t, ndn::shared_ptr<ndn::Data> > DataContainer;
+ DataContainer m_data;
+NdnPutFile::prepareNextData(uint64_t referenceSegmentNo)
+ // make sure m_data has [referenceSegmentNo, referenceSegmentNo + PRE_SIGN_DATA_COUNT] Data
+ if (m_isFinished)
+ return;
+ size_t nDataToPrepare = PRE_SIGN_DATA_COUNT;
+ if (!m_data.empty()) {
+ uint64_t maxSegmentNo = m_data.rbegin()->first;
+ if (maxSegmentNo - referenceSegmentNo >= nDataToPrepare) {
+ // nothing to prepare
+ return;
+ }
+ nDataToPrepare -= maxSegmentNo - referenceSegmentNo;
+ }
+ for (size_t i = 0; i < nDataToPrepare && !m_isFinished; ++i) {
+ uint8_t buffer[DEFAULT_BLOCK_SIZE];
+ std::streamsize readSize =
+ boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
+ if (readSize <= 0) {
+ throw Error("Error reading from the input stream");
+ }
+ ndn::shared_ptr<ndn::Data> data =
+ ndn::make_shared<ndn::Data>(Name(m_dataPrefix)
+ .appendSegment(m_currentSegmentNo));
+ if (insertStream->peek() == std::istream::traits_type::eof()) {
+ data->setFinalBlockId(ndn::name::Component::fromNumber(m_currentSegmentNo));
+ m_isFinished = true;
+ }
+ data->setContent(buffer, readSize);
+ data->setFreshnessPeriod(freshnessPeriod);
+ signData(*data);
+ m_data.insert(std::make_pair(m_currentSegmentNo, data));
+ ++m_currentSegmentNo;
+ }
+ m_dataPrefix = ndnName;
+ if (!isUnversioned)
+ m_dataPrefix.appendVersion(m_timestampVersion);
+ if (isVerbose)
+ std::cerr << "setInterestFilter for " << m_dataPrefix << std::endl;
+ m_face.setInterestFilter(m_dataPrefix,
+ isSingle ?
+ ndn::bind(&NdnPutFile::onSingleInterest, this, _1, _2)
+ :
+ ndn::bind(&NdnPutFile::onInterest, this, _1, _2),
+ ndn::bind(&NdnPutFile::onRegisterFailed, this, _1, _2));
+ // @todo Move startCommand
+ // setInterestFilter doesn't currently have "onSuccess" callback,
+ // so insertCommand needs to be started right away
+ startInsertCommand();
+ if (hasTimeout)
+ m_scheduler.scheduleEvent(timeout, ndn::bind(&NdnPutFile::stopProcess, this));
+ m_face.processEvents();
+ RepoCommandParameter parameters;
+ parameters.setName(m_dataPrefix);
+ if (!isSingle) {
+ parameters.setStartBlockId(0);
+ }
+ ndn::Interest commandInterest = generateCommandInterest(repoPrefix, "insert", parameters);
+ m_face.expressInterest(commandInterest,
+ ndn::bind(&NdnPutFile::onInsertCommandResponse, this, _1, _2),
+ ndn::bind(&NdnPutFile::onInsertCommandTimeout, this, _1));
+NdnPutFile::onInsertCommandResponse(const ndn::Interest& interest, ndn::Data& data)
+ RepoCommandResponse response(data.getContent().blockFromValue());
+ int statusCode = response.getStatusCode();
+ if (statusCode >= 400) {
+ throw Error("insert command failed with code " +
+ boost::lexical_cast<std::string>(statusCode));
+ }
+ m_processId = response.getProcessId();
+ m_scheduler.scheduleEvent(m_checkPeriod,
+ ndn::bind(&NdnPutFile::startCheckCommand, this));
+NdnPutFile::onInsertCommandTimeout(const ndn::Interest& interest)
+ throw Error("command response timeout");
+NdnPutFile::onInterest(const ndn::Name& prefix, const ndn::Interest& interest)
+ if (interest.getName().size() != prefix.size() + 1) {
+ if (isVerbose) {
+ std::cerr << "Error processing incoming interest " << interest << ": "
+ << "Unrecognized Interest" << std::endl;
+ }
+ return;
+ }
+ uint64_t segmentNo;
+ try {
+ ndn::Name::Component segmentComponent = interest.getName().get(prefix.size());
+ segmentNo = segmentComponent.toSegment();
+ }
+ catch (tlv::Error& e) {
+ if (isVerbose) {
+ std::cerr << "Error processing incoming interest " << interest << ": "
+ << e.what() << std::endl;
+ }
+ return;
+ }
+ prepareNextData(segmentNo);
+ DataContainer::iterator item = m_data.find(segmentNo);
+ if (item == m_data.end()) {
+ if (isVerbose) {
+ std::cerr << "Requested segment [" << segmentNo << "] does not exist" << std::endl;
+ }
+ return;
+ }
+ m_face.put(*item->second);
+NdnPutFile::onSingleInterest(const ndn::Name& prefix, const ndn::Interest& interest)
+ BOOST_ASSERT(prefix == m_dataPrefix);
+ if (prefix != interest.getName()) {
+ if (isVerbose) {
+ std::cerr << "Received unexpected interest " << interest << std::endl;
+ }
+ return;
+ }
+ uint8_t buffer[DEFAULT_BLOCK_SIZE];
+ std::streamsize readSize =
+ boost::iostreams::read(*insertStream, reinterpret_cast<char*>(buffer), DEFAULT_BLOCK_SIZE);
+ if (readSize <= 0) {
+ throw Error("Error reading from the input stream");
+ }
+ if (insertStream->peek() != std::istream::traits_type::eof()) {
+ throw Error("Input data does not fit into one Data packet");
+ }
+ ndn::Data data(m_dataPrefix);
+ data.setContent(buffer, readSize);
+ data.setFreshnessPeriod(freshnessPeriod);
+ signData(data);
+ m_face.put(data);
+ m_isFinished = true;
+NdnPutFile::onRegisterFailed(const ndn::Name& prefix, const std::string& reason)
+ throw Error("onRegisterFailed: " + reason);
+ m_face.getIoService().stop();
+NdnPutFile::signData(ndn::Data& data)
+ if (useDigestSha256) {
+ m_keyChain.signWithSha256(data);
+ }
+ else {
+ if (identityForData.empty())
+ m_keyChain.sign(data);
+ else {
+ ndn::Name keyName = m_keyChain.getDefaultKeyNameForIdentity(ndn::Name(identityForData));
+ ndn::Name certName = m_keyChain.getDefaultCertificateNameForKey(keyName);
+ m_keyChain.sign(data, certName);
+ }
+ }
+ ndn::Interest checkInterest = generateCommandInterest(repoPrefix, "insert check",
+ RepoCommandParameter()
+ .setProcessId(m_processId));
+ m_face.expressInterest(checkInterest,
+ ndn::bind(&NdnPutFile::onCheckCommandResponse, this, _1, _2),
+ ndn::bind(&NdnPutFile::onCheckCommandTimeout, this, _1));
+NdnPutFile::onCheckCommandResponse(const ndn::Interest& interest, ndn::Data& data)
+ RepoCommandResponse response(data.getContent().blockFromValue());
+ int statusCode = response.getStatusCode();
+ if (statusCode >= 400) {
+ throw Error("Insert check command failed with code: " +
+ boost::lexical_cast<std::string>(statusCode));
+ }
+ if (m_isFinished) {
+ uint64_t insertCount = response.getInsertNum();
+ if (isSingle) {
+ if (insertCount == 1) {
+ m_face.getIoService().stop();
+ return;
+ }
+ }
+ // Technically, the check should not infer, but directly has signal from repo that
+ // write operation has been finished
+ if (insertCount == m_currentSegmentNo) {
+ m_face.getIoService().stop();
+ return;
+ }
+ }
+ m_scheduler.scheduleEvent(m_checkPeriod,
+ ndn::bind(&NdnPutFile::startCheckCommand, this));
+NdnPutFile::onCheckCommandTimeout(const ndn::Interest& interest)
+ throw Error("check response timeout");
+NdnPutFile::generateCommandInterest(const ndn::Name& commandPrefix, const std::string& command,
+ const RepoCommandParameter& commandParameter)
+ ndn::Interest interest(ndn::Name(commandPrefix)
+ .append(command)
+ .append(commandParameter.wireEncode()));
+ interest.setInterestLifetime(interestLifetime);
+ if (identityForCommand.empty())
+ m_generator.generate(interest);
+ else {
+ m_generator.generateWithIdentity(interest, ndn::Name(identityForCommand));
+ }
+ return interest;
+static void
+ fprintf(stderr,
+ "ndnputfile [-u] [-s] [-D] [-d] [-i identity] [-I identity]"
+ " [-x freshness] [-l lifetime] [-w timeout] repo-prefix ndn-name filename\n"
+ "\n"
+ " Write a file into a repo.\n"
+ " -u: unversioned: do not add a version component\n"
+ " -s: single: do not add version or segment component, implies -u\n"
+ " -D: use DigestSha256 signing method instead of SignatureSha256WithRsa\n"
+ " -i: specify identity used for signing Data\n"
+ " -I: specify identity used for signing commands\n"
+ " -x: FreshnessPeriod in milliseconds\n"
+ " -l: InterestLifetime in milliseconds for each command\n"
+ " -w: timeout in milliseconds for whole process (default unlimited)\n"
+ " -v: be verbose\n"
+ " repo-prefix: repo command prefix\n"
+ " ndn-name: NDN Name prefix for written Data\n"
+ " filename: local file name; \"-\" reads from stdin\n"
+ );
+ exit(1);
+main(int argc, char** argv)
+ NdnPutFile ndnPutFile;
+ int opt;
+ while ((opt = getopt(argc, argv, "usDi:I:x:l:w:vh")) != -1) {
+ switch (opt) {
+ case 'u':
+ ndnPutFile.isUnversioned = true;
+ break;
+ case 's':
+ ndnPutFile.isSingle = true;
+ break;
+ case 'D':
+ ndnPutFile.useDigestSha256 = true;
+ break;
+ case 'i':
+ ndnPutFile.identityForData = std::string(optarg);
+ break;
+ case 'I':
+ ndnPutFile.identityForCommand = std::string(optarg);
+ break;
+ case 'x':
+ try {
+ ndnPutFile.freshnessPeriod = milliseconds(boost::lexical_cast<uint64_t>(optarg));
+ }
+ catch (boost::bad_lexical_cast&) {
+ std::cerr << "-x option should be an integer.";
+ return 1;
+ }
+ break;
+ case 'l':
+ try {
+ ndnPutFile.interestLifetime = milliseconds(boost::lexical_cast<uint64_t>(optarg));
+ }
+ catch (boost::bad_lexical_cast&) {
+ std::cerr << "-l option should be an integer.";
+ return 1;
+ }
+ break;
+ case 'w':
+ ndnPutFile.hasTimeout = true;
+ try {
+ ndnPutFile.timeout = milliseconds(boost::lexical_cast<uint64_t>(optarg));
+ }
+ catch (boost::bad_lexical_cast&) {
+ std::cerr << "-w option should be an integer.";
+ return 1;
+ }
+ break;
+ case 'v':
+ ndnPutFile.isVerbose = true;
+ break;
+ case 'h':
+ usage();
+ break;
+ default:
+ break;
+ }
+ }
+ argc -= optind;
+ argv += optind;
+ if (argc != 3)
+ usage();
+ ndnPutFile.repoPrefix = Name(argv[0]);
+ ndnPutFile.ndnName = Name(argv[1]);
+ if (strcmp(argv[2], "-") == 0) {
+ ndnPutFile.insertStream = &std::cin;
+ }
+ else {
+ std::ifstream inputFileStream(argv[2], std::ios::in | std::ios::binary);
+ if (!inputFileStream.is_open()) {
+ std::cerr << "ERROR: cannot open " << argv[2] << std::endl;
+ return 1;
+ }
+ ndnPutFile.insertStream = &inputFileStream;
+ }
+ // ndnPutFile MUST NOT be used anymore because .insertStream is a dangling pointer
+ return 0;
+} // namespace repo
+main(int argc, char** argv)
+ try {
+ return repo::main(argc, argv);
+ }
+ catch (std::exception& e) {
+ std::cerr << "ERROR: " << e.what() << std::endl;
+ return 2;
+ }
target='%s' % (str(app.change_ext('', '.cpp'))),
+ includes='..',
def options(opt):
opt.load('compiler_c compiler_cxx gnu_dirs')
- opt.load('boost default-compiler-flags doxygen', tooldir=['.waf-tools'])
+ opt.load('boost default-compiler-flags doxygen sqlite3', tooldir=['.waf-tools'])
ropt = opt.add_option_group('ndn-repo-ng Options')
@@ -16,21 +16,14 @@
ropt.add_option('--without-tools', action='store_false', default=True, dest='with_tools',
help='''Do not build tools''')
- ropt.add_option('--without-sqlite-locking', action='store_false', default=True,
- dest='with_sqlite_locking',
- help='''Disable filesystem locking in sqlite3 database (use unix-dot '''
- '''locking mechanism instead). This option may be necessary if home '''
- '''directory is hosted on NFS.''')
def configure(conf):
- conf.load("compiler_c compiler_cxx gnu_dirs boost default-compiler-flags")
- conf.check_cfg(package='sqlite3', args=['--cflags', '--libs'],
- uselib_store='SQLITE3', mandatory=True)
+ conf.load("compiler_c compiler_cxx gnu_dirs boost default-compiler-flags sqlite3")
conf.check_cfg(package='libndn-cxx', args=['--cflags', '--libs'],
uselib_store='NDN_CXX', mandatory=True)
+ conf.check_sqlite3(mandatory=True)
if conf.options.with_tests:
conf.env['WITH_TESTS'] = True