mgmt: Use InMemoryStorage for StatusDataset and Notification produced by managers

Change-Id: I2ed57061e25638c01df7a0323ed303cafe1be455
ref: #2182
diff --git a/src/mgmt/dispatcher.cpp b/src/mgmt/dispatcher.cpp
index 5dbe5cd..3b121ea 100644
--- a/src/mgmt/dispatcher.cpp
+++ b/src/mgmt/dispatcher.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -28,6 +28,8 @@
 namespace ndn {
 namespace mgmt {
 
+const time::milliseconds DEFAULT_FRESHNESS_PERIOD = time::milliseconds(1000);
+
 Authorization
 makeAcceptAllAuthorization()
 {
@@ -41,10 +43,12 @@
 }
 
 Dispatcher::Dispatcher(Face& face, security::KeyChain& keyChain,
-                       const security::SigningInfo& signingInfo)
+                       const security::SigningInfo& signingInfo,
+                       size_t imsCapacity)
   : m_face(face)
   , m_keyChain(keyChain)
   , m_signingInfo(signingInfo)
+  , m_storage(m_face.getIoService(), imsCapacity)
 {
 }
 
@@ -147,16 +151,46 @@
 }
 
 void
-Dispatcher::sendData(const Name& dataName, const Block& content,
-                     const MetaInfo& metaInfo)
+Dispatcher::queryStorage(const Name& prefix, const Interest& interest,
+                         const InterestHandler& missContinuation)
+{
+  auto data = m_storage.find(interest);
+  if (data == nullptr) {
+    // invoke missContinuation to process this Interest if the query fails.
+    missContinuation(prefix, interest);
+  }
+  else {
+    // send the fetched data through face if query succeeds.
+    sendOnFace(*data);
+  }
+}
+
+void
+Dispatcher::sendData(const Name& dataName, const Block& content, const MetaInfo& metaInfo,
+                     SendDestination option, time::milliseconds imsFresh)
 {
   shared_ptr<Data> data = make_shared<Data>(dataName);
-  data->setContent(content).setMetaInfo(metaInfo);
+  data->setContent(content).setMetaInfo(metaInfo).setFreshnessPeriod(DEFAULT_FRESHNESS_PERIOD);
 
   m_keyChain.sign(*data, m_signingInfo);
 
+  if (option == SendDestination::IMS || option == SendDestination::FACE_AND_IMS) {
+    lp::CachePolicy policy;
+    policy.setPolicy(lp::CachePolicyType::NO_CACHE);
+    data->setTag(make_shared<lp::CachePolicyTag>(policy));
+    m_storage.insert(*data, imsFresh);
+  }
+
+  if (option == SendDestination::FACE || option == SendDestination::FACE_AND_IMS) {
+    sendOnFace(*data);
+  }
+}
+
+void
+Dispatcher::sendOnFace(const Data& data)
+{
   try {
-    m_face.put(*data);
+    m_face.put(data);
   }
   catch (Face::Error& e) {
 #ifdef NDN_CXX_MGMT_DISPATCHER_ENABLE_LOGGING
@@ -210,14 +244,15 @@
 
 void
 Dispatcher::sendControlResponse(const ControlResponse& resp, const Interest& interest,
-                                bool isNack/*= false*/)
+                                bool isNack)
 {
-  MetaInfo info;
+  MetaInfo metaInfo;
   if (isNack) {
-    info.setType(tlv::ContentType_Nack);
+    metaInfo.setType(tlv::ContentType_Nack);
   }
-
-  sendData(interest.getName(), resp.wireEncode(), info);
+  // control response is always sent out through the face
+  sendData(interest.getName(), resp.wireEncode(), metaInfo, SendDestination::FACE,
+           DEFAULT_FRESHNESS_PERIOD);
 }
 
 void
@@ -238,8 +273,11 @@
          _1, _2, _3, handler);
   AuthorizationRejectedCallback rejected =
     bind(&Dispatcher::afterAuthorizationRejected, this, _1, _2);
-  m_handlers[relPrefix] = bind(&Dispatcher::processStatusDatasetInterest, this,
-                               _1, _2, authorization, accepted, rejected);
+
+  // follow the general path if storage is a miss
+  InterestHandler missContinuation = bind(&Dispatcher::processStatusDatasetInterest, this,
+                                          _1, _2, authorization, accepted, rejected);
+  m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
 }
 
 void
@@ -267,10 +305,31 @@
                                                    const Interest& interest,
                                                    const StatusDatasetHandler& handler)
 {
-  StatusDatasetContext context(interest, bind(&Dispatcher::sendData, this, _1, _2, _3));
+  StatusDatasetContext context(interest,
+                               bind(&Dispatcher::sendStatusDatasetSegment, this, _1, _2, _3, _4),
+                               bind(&Dispatcher::sendControlResponse, this, _1, interest, true));
   handler(prefix, interest, context);
 }
 
+void
+Dispatcher::sendStatusDatasetSegment(const Name& dataName, const Block& content,
+                                     time::milliseconds imsFresh, bool isFinalBlock)
+{
+  // the first segment will be sent to both places (the face and the in-memory storage)
+  // other segments will be inserted to the in-memory storage only
+  auto destination = SendDestination::IMS;
+  if (dataName[-1].toSegment() == 0) {
+    destination = SendDestination::FACE_AND_IMS;
+  }
+
+  MetaInfo metaInfo;
+  if (isFinalBlock) {
+    metaInfo.setFinalBlockId(dataName[-1]);
+  }
+
+  sendData(dataName, content, metaInfo, destination, imsFresh);
+}
+
 PostNotification
 Dispatcher::addNotificationStream(const PartialName& relPrefix)
 {
@@ -282,6 +341,11 @@
     throw std::out_of_range("relPrefix overlaps with another relPrefix");
   }
 
+  // keep silent if Interest does not match a stored notification
+  InterestHandler missContinuation = bind([]{});
+
+  // register a handler for the subscriber of this notification stream
+  m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
   m_streams[relPrefix] = 0;
   return bind(&Dispatcher::postNotification, this, _1, relPrefix);
 }
@@ -299,7 +363,11 @@
   Name streamName(m_topLevelPrefixes.begin()->second.topPrefix);
   streamName.append(relPrefix);
   streamName.appendSequenceNumber(m_streams[streamName]++);
-  sendData(streamName, notification, MetaInfo());
+
+  // notification is sent out the by face after inserting into the in-memory storage,
+  // because a request may be pending in the PIT
+  sendData(streamName, notification, MetaInfo(), SendDestination::FACE_AND_IMS,
+           DEFAULT_FRESHNESS_PERIOD);
 }
 
 } // namespace mgmt