mgmt: Use InMemoryStorage for StatusDataset and Notification produced by managers
Change-Id: I2ed57061e25638c01df7a0323ed303cafe1be455
ref: #2182
diff --git a/src/mgmt/dispatcher.cpp b/src/mgmt/dispatcher.cpp
index 5dbe5cd..3b121ea 100644
--- a/src/mgmt/dispatcher.cpp
+++ b/src/mgmt/dispatcher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -28,6 +28,8 @@
namespace ndn {
namespace mgmt {
+const time::milliseconds DEFAULT_FRESHNESS_PERIOD = time::milliseconds(1000);
+
Authorization
makeAcceptAllAuthorization()
{
@@ -41,10 +43,12 @@
}
Dispatcher::Dispatcher(Face& face, security::KeyChain& keyChain,
- const security::SigningInfo& signingInfo)
+ const security::SigningInfo& signingInfo,
+ size_t imsCapacity)
: m_face(face)
, m_keyChain(keyChain)
, m_signingInfo(signingInfo)
+ , m_storage(m_face.getIoService(), imsCapacity)
{
}
@@ -147,16 +151,46 @@
}
void
-Dispatcher::sendData(const Name& dataName, const Block& content,
- const MetaInfo& metaInfo)
+Dispatcher::queryStorage(const Name& prefix, const Interest& interest,
+ const InterestHandler& missContinuation)
+{
+ auto data = m_storage.find(interest);
+ if (data == nullptr) {
+ // invoke missContinuation to process this Interest if the query fails.
+ missContinuation(prefix, interest);
+ }
+ else {
+ // send the fetched data through face if query succeeds.
+ sendOnFace(*data);
+ }
+}
+
+void
+Dispatcher::sendData(const Name& dataName, const Block& content, const MetaInfo& metaInfo,
+ SendDestination option, time::milliseconds imsFresh)
{
shared_ptr<Data> data = make_shared<Data>(dataName);
- data->setContent(content).setMetaInfo(metaInfo);
+ data->setContent(content).setMetaInfo(metaInfo).setFreshnessPeriod(DEFAULT_FRESHNESS_PERIOD);
m_keyChain.sign(*data, m_signingInfo);
+ if (option == SendDestination::IMS || option == SendDestination::FACE_AND_IMS) {
+ lp::CachePolicy policy;
+ policy.setPolicy(lp::CachePolicyType::NO_CACHE);
+ data->setTag(make_shared<lp::CachePolicyTag>(policy));
+ m_storage.insert(*data, imsFresh);
+ }
+
+ if (option == SendDestination::FACE || option == SendDestination::FACE_AND_IMS) {
+ sendOnFace(*data);
+ }
+}
+
+void
+Dispatcher::sendOnFace(const Data& data)
+{
try {
- m_face.put(*data);
+ m_face.put(data);
}
catch (Face::Error& e) {
#ifdef NDN_CXX_MGMT_DISPATCHER_ENABLE_LOGGING
@@ -210,14 +244,15 @@
void
Dispatcher::sendControlResponse(const ControlResponse& resp, const Interest& interest,
- bool isNack/*= false*/)
+ bool isNack)
{
- MetaInfo info;
+ MetaInfo metaInfo;
if (isNack) {
- info.setType(tlv::ContentType_Nack);
+ metaInfo.setType(tlv::ContentType_Nack);
}
-
- sendData(interest.getName(), resp.wireEncode(), info);
+ // control response is always sent out through the face
+ sendData(interest.getName(), resp.wireEncode(), metaInfo, SendDestination::FACE,
+ DEFAULT_FRESHNESS_PERIOD);
}
void
@@ -238,8 +273,11 @@
_1, _2, _3, handler);
AuthorizationRejectedCallback rejected =
bind(&Dispatcher::afterAuthorizationRejected, this, _1, _2);
- m_handlers[relPrefix] = bind(&Dispatcher::processStatusDatasetInterest, this,
- _1, _2, authorization, accepted, rejected);
+
+ // follow the general path if storage is a miss
+ InterestHandler missContinuation = bind(&Dispatcher::processStatusDatasetInterest, this,
+ _1, _2, authorization, accepted, rejected);
+ m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
}
void
@@ -267,10 +305,31 @@
const Interest& interest,
const StatusDatasetHandler& handler)
{
- StatusDatasetContext context(interest, bind(&Dispatcher::sendData, this, _1, _2, _3));
+ StatusDatasetContext context(interest,
+ bind(&Dispatcher::sendStatusDatasetSegment, this, _1, _2, _3, _4),
+ bind(&Dispatcher::sendControlResponse, this, _1, interest, true));
handler(prefix, interest, context);
}
+void
+Dispatcher::sendStatusDatasetSegment(const Name& dataName, const Block& content,
+ time::milliseconds imsFresh, bool isFinalBlock)
+{
+ // the first segment will be sent to both places (the face and the in-memory storage)
+ // other segments will be inserted to the in-memory storage only
+ auto destination = SendDestination::IMS;
+ if (dataName[-1].toSegment() == 0) {
+ destination = SendDestination::FACE_AND_IMS;
+ }
+
+ MetaInfo metaInfo;
+ if (isFinalBlock) {
+ metaInfo.setFinalBlockId(dataName[-1]);
+ }
+
+ sendData(dataName, content, metaInfo, destination, imsFresh);
+}
+
PostNotification
Dispatcher::addNotificationStream(const PartialName& relPrefix)
{
@@ -282,6 +341,11 @@
throw std::out_of_range("relPrefix overlaps with another relPrefix");
}
+ // keep silent if Interest does not match a stored notification
+ InterestHandler missContinuation = bind([]{});
+
+ // register a handler for the subscriber of this notification stream
+ m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
m_streams[relPrefix] = 0;
return bind(&Dispatcher::postNotification, this, _1, relPrefix);
}
@@ -299,7 +363,11 @@
Name streamName(m_topLevelPrefixes.begin()->second.topPrefix);
streamName.append(relPrefix);
streamName.appendSequenceNumber(m_streams[streamName]++);
- sendData(streamName, notification, MetaInfo());
+
+ // notification is sent out the by face after inserting into the in-memory storage,
+ // because a request may be pending in the PIT
+ sendData(streamName, notification, MetaInfo(), SendDestination::FACE_AND_IMS,
+ DEFAULT_FRESHNESS_PERIOD);
}
} // namespace mgmt
diff --git a/src/mgmt/dispatcher.hpp b/src/mgmt/dispatcher.hpp
index 1e1195e..b19aba6 100644
--- a/src/mgmt/dispatcher.hpp
+++ b/src/mgmt/dispatcher.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -25,6 +25,7 @@
#include "../face.hpp"
#include "../security/key-chain.hpp"
#include "../encoding/block.hpp"
+#include "../util/in-memory-storage-fifo.hpp"
#include "control-response.hpp"
#include "control-parameters.hpp"
#include "status-dataset-context.hpp"
@@ -142,9 +143,11 @@
* \param face the Face on which the dispatcher operates
* \param keyChain a KeyChain to sign Data
* \param signingInfo signing parameters to sign Data with \p keyChain
+ * \param imsCapacity capacity of the internal InMemoryStorage used by dispatcher
*/
Dispatcher(Face& face, security::KeyChain& keyChain,
- const security::SigningInfo& signingInfo = security::SigningInfo());
+ const security::SigningInfo& signingInfo = security::SigningInfo(),
+ size_t imsCapacity = 256);
virtual
~Dispatcher();
@@ -315,9 +318,52 @@
void
afterAuthorizationRejected(RejectReply act, const Interest& interest);
+ /**
+ * @brief query Data the in-memory storage by a given Interest
+ *
+ * if the query fails, invoke @p missContinuation to process @p interest.
+ *
+ * @param prefix the top-level prefix
+ * @param interest the request
+ * @param missContinuation the handler of request when the query fails
+ */
void
- sendData(const Name& dataName, const Block& content,
- const MetaInfo& metaInfo);
+ queryStorage(const Name& prefix, const Interest& interest, const InterestHandler& missContinuation);
+
+ enum class SendDestination {
+ NONE = 0,
+ FACE = 1,
+ IMS = 2,
+ FACE_AND_IMS = 3
+ };
+
+ /**
+ * @brief send data to the face or in-memory storage
+ *
+ * create a data packet with the given @p dataName, @p content, and @p metaInfo,
+ * set its FreshnessPeriod to DEFAULT_FRESHNESS_PERIOD, and then send it out through the face and/or
+ * insert it into the in-memory storage as specified in @p option.
+ *
+ * if it's toward the in-memory storage, set its CachePolicy to NO_CACHE and limit
+ * its FreshnessPeriod in the storage as @p imsFresh
+ *
+ * @param dataName the name of this piece of data
+ * @param content the content of this piece of data
+ * @param metaInfo some meta information of this piece of data
+ * @param destination where to send this piece of data
+ * @param imsFresh freshness period of this piece of data in in-memory storage
+ */
+ void
+ sendData(const Name& dataName, const Block& content, const MetaInfo& metaInfo,
+ SendDestination destination, time::milliseconds imsFresh);
+
+ /**
+ * @brief send out a data packt through the face
+ *
+ * @param data the data packet to insert
+ */
+ void
+ sendOnFace(const Data& data);
/**
* @brief process the control-command Interest before authorization.
@@ -390,6 +436,18 @@
const Interest& interest,
const StatusDatasetHandler& handler);
+ /**
+ * @brief send a segment of StatusDataset
+ *
+ * @param dataName the name of this piece of data
+ * @param content the content of this piece of data
+ * @param imsFresh the freshness period of this piece of data in the in-memory storage
+ * @param isFinalBlock indicates whether this piece of data is the final block
+ */
+ void
+ sendStatusDatasetSegment(const Name& dataName, const Block& content,
+ time::milliseconds imsFresh, bool isFinalBlock);
+
void
postNotification(const Block& notification, const PartialName& relPrefix);
@@ -413,6 +471,9 @@
// NotificationStream name => next sequence number
std::unordered_map<Name, uint64_t> m_streams;
+
+NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ util::InMemoryStorageFifo m_storage;
};
template<typename CP>
diff --git a/src/mgmt/status-dataset-context.cpp b/src/mgmt/status-dataset-context.cpp
index c3bc332..dbf6774 100644
--- a/src/mgmt/status-dataset-context.cpp
+++ b/src/mgmt/status-dataset-context.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -75,7 +75,6 @@
m_state = State::RESPONDED;
size_t nBytesLeft = block.size();
-
while (nBytesLeft > 0) {
size_t nBytesAppend = std::min(nBytesLeft,
(ndn::MAX_NDN_PACKET_SIZE >> 1) - m_buffer->size());
@@ -83,9 +82,9 @@
nBytesLeft -= nBytesAppend;
if (nBytesLeft > 0) {
- const Block& content = makeBinaryBlock(tlv::Content, m_buffer->buf(), m_buffer->size());
- m_dataSender(Name(m_prefix).appendSegment(m_segmentNo++), content,
- MetaInfo().setFreshnessPeriod(m_expiry));
+ m_dataSender(Name(m_prefix).appendSegment(m_segmentNo++),
+ makeBinaryBlock(tlv::Content, m_buffer->buf(), m_buffer->size()),
+ m_expiry, false);
m_buffer = std::make_shared<EncodingBuffer>();
}
@@ -101,9 +100,9 @@
m_state = State::FINALIZED;
- auto dataName = Name(m_prefix).appendSegment(m_segmentNo++);
- m_dataSender(dataName, makeBinaryBlock(tlv::Content, m_buffer->buf(), m_buffer->size()),
- MetaInfo().setFreshnessPeriod(m_expiry).setFinalBlockId(dataName[-1]));
+ m_dataSender(Name(m_prefix).appendSegment(m_segmentNo),
+ makeBinaryBlock(tlv::Content, m_buffer->buf(), m_buffer->size()),
+ m_expiry, true);
}
void
@@ -115,14 +114,15 @@
m_state = State::FINALIZED;
- m_dataSender(m_interest.getName(), resp.wireEncode(),
- MetaInfo().setType(tlv::ContentType_Nack));
+ m_nackSender(resp);
}
StatusDatasetContext::StatusDatasetContext(const Interest& interest,
- const DataSender& dataSender)
+ const DataSender& dataSender,
+ const NackSender& nackSender)
: m_interest(interest)
, m_dataSender(dataSender)
+ , m_nackSender(nackSender)
, m_expiry(DEFAULT_STATUS_DATASET_FRESHNESS_PERIOD)
, m_buffer(make_shared<EncodingBuffer>())
, m_segmentNo(0)
diff --git a/src/mgmt/status-dataset-context.hpp b/src/mgmt/status-dataset-context.hpp
index 45778ea..42ad0c1 100644
--- a/src/mgmt/status-dataset-context.hpp
+++ b/src/mgmt/status-dataset-context.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -92,16 +92,20 @@
reject(const ControlResponse& resp = ControlResponse().setCode(400));
NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
- typedef std::function<void(const Name& dataName, const Block& content,
- const MetaInfo& metaInfo)> DataSender;
+ typedef std::function<void(const Name& dataName, const Block& content, time::milliseconds imsFresh,
+ bool isFinalBlock)> DataSender;
+ typedef std::function<void(const ControlResponse& resp)> NackSender;
- StatusDatasetContext(const Interest& interest, const DataSender& dataSender);
+ StatusDatasetContext(const Interest& interest,
+ const DataSender& dataSender,
+ const NackSender& nackSender);
private:
friend class Dispatcher;
const Interest& m_interest;
DataSender m_dataSender;
+ NackSender m_nackSender;
Name m_prefix;
time::milliseconds m_expiry;