Adding implementation of Interest Queue

Change-Id: I4a11db79053e69a19ceba0e0501b93911131d3a2
Refs: #3612
diff --git a/src/interest-queue.cpp b/src/interest-queue.cpp
new file mode 100644
index 0000000..062e222
--- /dev/null
+++ b/src/interest-queue.cpp
@@ -0,0 +1,43 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+* Copyright (c) 2016 Regents of the University of California.
+*
+* This file is part of the nTorrent codebase.
+*
+* nTorrent is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Lesser General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or (at your option) any later version.
+*
+* nTorrent is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+*
+* You should have received copies of the GNU General Public License and GNU Lesser
+* General Public License along with nTorrent, e.g., in COPYING.md file. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* See AUTHORS for complete list of nTorrent authors and contributors.
+*/
+
+#include "interest-queue.hpp"
+
+namespace ndn {
+namespace ntorrent {
+
+void
+InterestQueue::push(shared_ptr<Interest> interest, DataCallback dataReceivedCallback,
+     TimeoutCallback dataFailedCallback)
+{
+  m_queue.push(std::make_tuple(interest, dataReceivedCallback, dataFailedCallback));
+}
+
+queueTuple
+InterestQueue::pop()
+{
+  queueTuple tup = m_queue.front();
+  m_queue.pop();
+  return tup;
+}
+
+} // namespace ntorrent
+} // namespace ndn
diff --git a/src/interest-queue.hpp b/src/interest-queue.hpp
new file mode 100644
index 0000000..6600297
--- /dev/null
+++ b/src/interest-queue.hpp
@@ -0,0 +1,106 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+* Copyright (c) 2016 Regents of the University of California.
+*
+* This file is part of the nTorrent codebase.
+*
+* nTorrent is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Lesser General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or (at your option) any later version.
+*
+* nTorrent is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+*
+* You should have received copies of the GNU General Public License and GNU Lesser
+* General Public License along with nTorrent, e.g., in COPYING.md file. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* See AUTHORS for complete list of nTorrent authors and contributors.
+*/
+
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+
+#include <queue>
+#include <tuple>
+
+typedef std::tuple<std::shared_ptr<ndn::Interest>, ndn::DataCallback, ndn::TimeoutCallback> queueTuple;
+
+namespace ndn {
+namespace ntorrent {
+
+class InterestQueue
+{
+public:
+  InterestQueue() = default;
+
+  ~InterestQueue() = default;
+
+  /**
+   * @brief Push a tuple to the Interest Queue
+   * @param interest A shared pointer to an Interest
+   * @param dataReceivedCallback Callback to be called when data is received for the given
+   *                             Interest
+   * @param dataFailedCallback Callback to be called when we fail to retrieve data for the
+   *                           given Interest
+   *
+   */
+  void
+  push(shared_ptr<Interest> interest, DataCallback dataReceivedCallback,
+       TimeoutCallback dataFailedCallback);
+
+  /**
+   * @brief Pop a tuple from the Interest Queue
+   * @return A tuple of a shared pointer to an Interest, a callaback for successful data
+   *         retrieval and a callback for failed data retrieval
+   */
+  queueTuple
+  pop();
+
+  /**
+   * @brief Return the size of the queue (number of tuples)
+   * @return The number of tuples stored in the queue
+   */
+  size_t
+  size() const;
+
+  /**
+   * @brief Check if the queue is empty
+   * @return True if the queue is empty, otherwise false
+   */
+  bool
+  empty() const;
+
+  /**
+   * @brief Return the top element of the Interest queue
+   * @return The top tuple element of the Interest queue
+   */
+   queueTuple
+   front() const;
+
+private:
+  std::queue<queueTuple> m_queue;
+};
+
+inline size_t
+InterestQueue::size() const
+{
+  return m_queue.size();
+}
+
+inline bool
+InterestQueue::empty() const
+{
+  return m_queue.empty();
+}
+
+inline queueTuple
+InterestQueue::front() const
+{
+  return m_queue.front();
+}
+
+} // namespace ntorrent
+} // namespace ndn
diff --git a/src/sequential-data-fetcher.cpp b/src/sequential-data-fetcher.cpp
index 472cd71..758f6f7 100644
--- a/src/sequential-data-fetcher.cpp
+++ b/src/sequential-data-fetcher.cpp
@@ -170,14 +170,6 @@
     this->downloadManifestFiles({ interest.getName() });
   }
   else if (nameType == IoUtil::DATA_PACKET) {
-    LOG_ERROR << "Torrent File Segment Downloading Failed: " << interest.getName();
-    this->downloadTorrentFile();
-  }
-  else if (nameType == IoUtil::FILE_MANIFEST) {
-    LOG_ERROR << "Manifest File Segment Downloading Failed: " << interest.getName();
-    this->downloadManifestFiles({ interest.getName() });
-  }
-  else if (nameType == IoUtil::DATA_PACKET) {
     LOG_ERROR << "Data Packet Downloading Failed: " << interest.getName();
     this->downloadPackets({ interest.getName() });
   }
diff --git a/src/torrent-manager.cpp b/src/torrent-manager.cpp
index 179893e..110d90f 100644
--- a/src/torrent-manager.cpp
+++ b/src/torrent-manager.cpp
@@ -403,13 +403,16 @@
       if (nextSegmentPtr != nullptr) {
         this->downloadTorrentFileSegment(*nextSegmentPtr, path, onSuccess, onFailed);
       }
-      if (!m_seedFlag && m_pendingInterests.empty()) {
+      this->sendInterest();
+      if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
         shutdown();
+        return;
       }
   };
 
   auto dataFailed = [path, name, onSuccess, onFailed, this]
                                                 (const Interest& interest) {
+    m_pendingInterests.erase(interest.getName());
     ++m_retries;
     if (m_retries >= MAX_NUM_OF_RETRIES) {
       ++m_stats_table_iter;
@@ -417,14 +420,15 @@
         m_stats_table_iter = m_statsTable.begin();
       }
     }
+    this->sendInterest();
     if (onFailed) {
       onFailed(interest.getName(), "Unknown error");
     }
     this->downloadTorrentFileSegment(name, path, onSuccess, onFailed);
   };
-  m_pendingInterests.insert(interest->getName());
-  LOG_DEBUG << "Sending: " << *interest << std::endl;
-  m_face->expressInterest(*interest, dataReceived, dataFailed);
+  LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+  m_interestQueue->push(interest, dataReceived, dataFailed);
+  this->sendInterest();
 }
 
 void
@@ -485,13 +489,16 @@
     m_stats_table_iter->incrementReceivedData();
     m_retries = 0;
     onSuccess(data.getName());
-    if (!m_seedFlag && m_pendingInterests.empty()) {
+    this->sendInterest();
+    if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
       shutdown();
     }
   };
+
   auto dataFailed = [onFailed, this]
                              (const Interest& interest) {
     m_retries++;
+    m_pendingInterests.erase(interest.getName());
     if (m_retries >= MAX_NUM_OF_RETRIES) {
       m_stats_table_iter++;
       if (m_stats_table_iter == m_statsTable.end()) {
@@ -499,10 +506,11 @@
       }
     }
     onFailed(interest.getName(), "Unknown failure");
+    this->sendInterest();
   };
-  m_pendingInterests.insert(interest->getName());
-  LOG_DEBUG << "Sending: " <<  *interest << std::endl;
-  m_face->expressInterest(*interest, dataReceived, dataFailed);
+  LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+  m_interestQueue->push(interest, dataReceived, dataFailed);
+  this->sendInterest();
 }
 
 void TorrentManager::seed(const Data& data) {
@@ -639,13 +647,16 @@
     else {
       onSuccess(*packetNames);
     }
-    if (!m_seedFlag && m_pendingInterests.empty()) {
+    this->sendInterest();
+    if (m_pendingInterests.empty() && m_interestQueue->empty() && !m_seedFlag) {
       shutdown();
+      return;
     }
   };
 
   auto dataFailed = [packetNames, path, manifestName, onFailed, this]
                                                 (const Interest& interest) {
+    m_pendingInterests.erase(interest.getName());
     m_retries++;
     if (m_retries >= MAX_NUM_OF_RETRIES) {
       m_stats_table_iter++;
@@ -653,17 +664,18 @@
         m_stats_table_iter = m_statsTable.begin();
     }
     onFailed(interest.getName(), "Unknown failure");
+    this->sendInterest();
   };
-  m_pendingInterests.insert(interest->getName());
-  LOG_DEBUG << "Sending: " <<  *interest << std::endl;
-  m_face->expressInterest(*interest, dataReceived, dataFailed);
+  LOG_DEBUG << "Pushing to the Interest Queue: " << *interest << std::endl;
+  m_interestQueue->push(interest, dataReceived, dataFailed);
+  this->sendInterest();
 }
 
 void
 TorrentManager::onInterestReceived(const InterestFilter& filter, const Interest& interest)
 {
   // handle if it is a torrent-file
-  LOG_DEBUG << "Interest Recevied: " << interest << std::endl;
+  LOG_DEBUG << "Interest Received: " << interest << std::endl;
   const auto& interestName = interest.getName();
   std::shared_ptr<Data> data = nullptr;
   auto cmp = [&interestName](const Data& t){return t.getFullName() == interestName;};
@@ -764,5 +776,19 @@
   return interest;
 }
 
+void
+TorrentManager::sendInterest()
+{
+  auto nackCallBack = [](const Interest& i, const lp::Nack& n) {
+    LOG_ERROR << "Nack received: " << n.getReason() << ": " << i << std::endl;
+   };
+  while (m_pendingInterests.size() < WINDOW_SIZE && !m_interestQueue->empty()) {
+    queueTuple tup = m_interestQueue->pop();
+    m_pendingInterests.insert(std::get<0>(tup)->getName());
+    LOG_DEBUG << "Sending: " <<  *(std::get<0>(tup)) << std::endl;
+    m_face->expressInterest(*std::get<0>(tup), std::get<1>(tup), nackCallBack, std::get<2>(tup));
+  }
+}
+
 }  // end ntorrent
 }  // end ndn
diff --git a/src/torrent-manager.hpp b/src/torrent-manager.hpp
index 20fdb02..12096c3 100644
--- a/src/torrent-manager.hpp
+++ b/src/torrent-manager.hpp
@@ -22,6 +22,7 @@
 #define INCLUDED_TORRENT_FILE_MANAGER_H
 
 #include "file-manifest.hpp"
+#include "interest-queue.hpp"
 #include "torrent-file.hpp"
 #include "update-handler.hpp"
 
@@ -288,7 +289,9 @@
     // Number of times to retry if a routable prefix fails to retrieve data
     MAX_NUM_OF_RETRIES = 5,
     // Number of Interests to be sent before sorting the stats table
-    SORTING_INTERVAL = 100
+    SORTING_INTERVAL = 100,
+    // Maximum window size used for sending new Interests out
+    WINDOW_SIZE = 50
   };
 
   void onDataReceived(const Data& data);
@@ -320,6 +323,10 @@
   shared_ptr<Interest>
   createInterest(Name name);
 
+  void
+  sendInterest();
+
+
   // A flag to determine if upon completion we should continue seeding
   bool                                                                m_seedFlag;
   // Face used for network communication
@@ -336,6 +343,8 @@
   shared_ptr<KeyChain>                                                m_keyChain;
 
   std::unordered_set<ndn::Name>                                       m_pendingInterests;
+
+  shared_ptr<InterestQueue>                                           m_interestQueue;
   // TODO(spyros) Fix and reintegrate update handler
   // // Update Handler instance
   // shared_ptr<UpdateHandler>                                           m_updateHandler;
@@ -357,6 +366,8 @@
 , m_sortingCounter(0)
 , m_keyChain(new KeyChain())
 {
+  m_interestQueue = make_shared<InterestQueue>();
+
   if(face == nullptr) {
     m_face = make_shared<Face>();
   }
diff --git a/src/util/logging.hpp b/src/util/logging.hpp
index e07626d..d10e926 100644
--- a/src/util/logging.hpp
+++ b/src/util/logging.hpp
@@ -28,7 +28,7 @@
 #include <boost/log/sources/severity_logger.hpp>
 #include <boost/log/trivial.hpp>
 
-enum { SEVERITY_THRESHOLD = boost::log::trivial::trace };
+enum { SEVERITY_THRESHOLD = boost::log::trivial::debug };
 
 // register a global logger
 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>)