blob: 40c1ecd9ac5d512068c3d7d0135900b9373b5b76 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2013-2016, Regents of the University of California.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080019 */
20
21#include "fetcher.h"
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -080022#include "fetch-manager.h"
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070023#include "ndnx-pco.h"
Alexander Afanasyev678f3502013-01-23 17:09:37 -080024#include "logging.h"
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -080025
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080026#include <boost/make_shared.hpp>
27#include <boost/ref.hpp>
28#include <boost/throw_exception.hpp>
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080029#include <boost/date_time/posix_time/posix_time.hpp>
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080030
Alexander Afanasyev678f3502013-01-23 17:09:37 -080031INIT_LOGGER ("Fetcher");
32
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080033using namespace boost;
34using namespace std;
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070035using namespace Ndnx;
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080036
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070037Fetcher::Fetcher (Ndnx::NdnxWrapperPtr ndnx,
Zhenkai Zhuab9215c2013-01-28 23:42:28 -080038 ExecutorPtr executor,
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -080039 const SegmentCallback &segmentCallback,
40 const FinishCallback &finishCallback,
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080041 OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070042 const Ndnx::Name &deviceName, const Ndnx::Name &name, int64_t minSeqNo, int64_t maxSeqNo,
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080043 boost::posix_time::time_duration timeout/* = boost::posix_time::seconds (30)*/,
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070044 const Ndnx::Name &forwardingHint/* = Ndnx::Name ()*/)
45 : m_ndnx (ndnx)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080046
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -080047 , m_segmentCallback (segmentCallback)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080048 , m_onFetchComplete (onFetchComplete)
49 , m_onFetchFailed (onFetchFailed)
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -080050 , m_finishCallback (finishCallback)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080051
Alexander Afanasyev83531a42013-01-19 16:21:54 -080052 , m_active (false)
Zhenkai Zhu354d46d2013-02-06 13:49:48 -080053 , m_timedwait (false)
Alexander Afanasyev49a18522013-01-18 17:49:04 -080054 , m_name (name)
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -080055 , m_deviceName (deviceName)
Alexander Afanasyev49a18522013-01-18 17:49:04 -080056 , m_forwardingHint (forwardingHint)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080057 , m_maximumNoActivityPeriod (timeout)
58
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -080059 , m_minSendSeqNo (minSeqNo-1)
60 , m_maxInOrderRecvSeqNo (minSeqNo-1)
Alexander Afanasyev49a18522013-01-18 17:49:04 -080061 , m_minSeqNo (minSeqNo)
62 , m_maxSeqNo (maxSeqNo)
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080063
Yingdi Yuf0b3de32013-07-11 12:59:00 -070064 , m_pipeline (1) // initial "congestion window"
Alexander Afanasyev50547892013-01-19 22:03:45 -080065 , m_activePipeline (0)
Yingdi Yuf0b3de32013-07-11 12:59:00 -070066
67 , m_rto(1) // for temporary congestion control, should be removed when NDN provide transport functionality.
68 , m_maxRto(static_cast<double>(timeout.total_seconds())/2)
69 , m_slowStart(true)
70 , m_threshold(4)
71 , m_roundCount(0)
72
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080073 , m_retryPause (0)
Alexander Afanasyevf8b22bf2013-01-26 17:28:11 -080074 , m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
Zhenkai Zhuab9215c2013-01-28 23:42:28 -080075 , m_executor (executor) // must be 1
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080076{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080077}
78
79Fetcher::~Fetcher ()
80{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080081}
Alexander Afanasyev83531a42013-01-19 16:21:54 -080082
83void
84Fetcher::RestartPipeline ()
85{
86 m_active = true;
Alexander Afanasyev50547892013-01-19 22:03:45 -080087 m_minSendSeqNo = m_maxInOrderRecvSeqNo;
Alexander Afanasyev650ba282013-01-20 20:14:06 -080088 // cout << "Restart: " << m_minSendSeqNo << endl;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080089 m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
Alexander Afanasyev50547892013-01-19 22:03:45 -080090
Zhenkai Zhuab9215c2013-01-28 23:42:28 -080091 m_executor->execute (bind (&Fetcher::FillPipeline, this));
Alexander Afanasyev50547892013-01-19 22:03:45 -080092}
93
94void
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070095Fetcher::SetForwardingHint (const Ndnx::Name &forwardingHint)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080096{
97 m_forwardingHint = forwardingHint;
98}
99
100void
Alexander Afanasyev50547892013-01-19 22:03:45 -0800101Fetcher::FillPipeline ()
102{
103 for (; m_minSendSeqNo < m_maxSeqNo && m_activePipeline < m_pipeline; m_minSendSeqNo++)
104 {
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800105 unique_lock<mutex> lock (m_seqNoMutex);
106
Alexander Afanasyev650ba282013-01-20 20:14:06 -0800107 if (m_outOfOrderRecvSeqNo.find (m_minSendSeqNo+1) != m_outOfOrderRecvSeqNo.end ())
108 continue;
109
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800110 if (m_inActivePipeline.find (m_minSendSeqNo+1) != m_inActivePipeline.end ())
111 continue;
112
113 m_inActivePipeline.insert (m_minSendSeqNo+1);
114
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800115 _LOG_DEBUG (" >>> i " << Name (m_forwardingHint)(m_name) << ", seq = " << (m_minSendSeqNo + 1 ));
Alexander Afanasyev678f3502013-01-23 17:09:37 -0800116
Alexander Afanasyev650ba282013-01-20 20:14:06 -0800117 // cout << ">>> " << m_minSendSeqNo+1 << endl;
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700118 m_ndnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800119 Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
Zhenkai Zhuff4fa8a2013-01-28 22:02:40 -0800120 bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1, _2, _3)),
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700121 Selectors().interestLifetime (m_rto)); // Alex: this lifetime should be changed to RTO
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800122 _LOG_DEBUG (" >>> i ok");
Alexander Afanasyev50547892013-01-19 22:03:45 -0800123
124 m_activePipeline ++;
125 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800126}
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800127
128void
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700129Fetcher::OnData (uint64_t seqno, const Ndnx::Name &name, PcoPtr data)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800130{
Zhenkai Zhuab9215c2013-01-28 23:42:28 -0800131 m_executor->execute (bind (&Fetcher::OnData_Execute, this, seqno, name, data));
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800132}
133
134void
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700135Fetcher::OnData_Execute (uint64_t seqno, Ndnx::Name name, Ndnx::PcoPtr data)
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800136{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800137 _LOG_DEBUG (" <<< d " << name.getPartialName (0, name.size () - 1) << ", seq = " << seqno);
138
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800139 if (m_forwardingHint == Name ())
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800140 {
Zhenkai Zhud5d99be2013-03-13 19:15:56 -0700141 // TODO: check verified!!!!
142 if (true)
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800143 {
144 if (!m_segmentCallback.empty ())
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800145 {
146 m_segmentCallback (m_deviceName, m_name, seqno, data);
147 }
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800148 }
149 else
150 {
151 _LOG_ERROR("Can not verify signature content. Name = " << data->name());
152 // probably needs to do more in the future
153 }
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800154 // we don't have to tell FetchManager about this
155 }
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800156 else
157 {
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700158 // in this case we don't care whether "data" is verified, in fact, we expect it is unverified
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800159 try {
Alexander Afanasyevf278db32013-01-21 14:41:01 -0800160 PcoPtr pco = make_shared<ParsedContentObject> (*data->contentPtr ());
Zhenkai Zhu79264a42013-02-07 21:49:42 -0800161
162 // we need to verify this pco and apply callback only when verified
Zhenkai Zhud5d99be2013-03-13 19:15:56 -0700163 // TODO: check verified !!!
164 if (true)
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800165 {
166 if (!m_segmentCallback.empty ())
167 {
168 m_segmentCallback (m_deviceName, m_name, seqno, pco);
169 }
170 }
171 else
172 {
173 _LOG_ERROR("Can not verify signature content. Name = " << pco->name());
174 // probably needs to do more in the future
175 }
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800176 }
177 catch (MisformedContentObjectException &e)
178 {
179 cerr << "MisformedContentObjectException..." << endl;
180 // no idea what should do...
181 // let's ignore for now
182 }
183 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800184
Alexander Afanasyev50547892013-01-19 22:03:45 -0800185 m_activePipeline --;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800186 m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
Alexander Afanasyev50547892013-01-19 22:03:45 -0800187
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700188 {
189 unique_lock<mutex> lock (m_pipelineMutex);
190 if(m_slowStart){
191 m_pipeline++;
192 if(m_pipeline == m_threshold)
193 m_slowStart = false;
194 }
195 else{
196 m_roundCount++;
197 _LOG_DEBUG ("roundCount: " << m_roundCount);
198 if(m_roundCount == m_pipeline){
199 m_pipeline++;
200 m_roundCount = 0;
201 }
202 }
203 }
204
205 _LOG_DEBUG ("slowStart: " << boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
206
207
Alexander Afanasyev50547892013-01-19 22:03:45 -0800208 ////////////////////////////////////////////////////////////////////////////
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800209 unique_lock<mutex> lock (m_seqNoMutex);
210
Alexander Afanasyev50547892013-01-19 22:03:45 -0800211 m_outOfOrderRecvSeqNo.insert (seqno);
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800212 m_inActivePipeline.erase (seqno);
213 _LOG_DEBUG ("Total segments received: " << m_outOfOrderRecvSeqNo.size ());
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800214 set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
Alexander Afanasyev50547892013-01-19 22:03:45 -0800215 for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end ();
216 inOrderSeqNo++)
217 {
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800218 _LOG_TRACE ("Checking " << *inOrderSeqNo << " and " << m_maxInOrderRecvSeqNo+1);
Alexander Afanasyev50547892013-01-19 22:03:45 -0800219 if (*inOrderSeqNo == m_maxInOrderRecvSeqNo+1)
220 {
221 m_maxInOrderRecvSeqNo = *inOrderSeqNo;
222 }
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800223 else if (*inOrderSeqNo < m_maxInOrderRecvSeqNo+1) // not possible anymore, but just in case
224 {
225 continue;
226 }
Alexander Afanasyev50547892013-01-19 22:03:45 -0800227 else
228 break;
229 }
230 m_outOfOrderRecvSeqNo.erase (m_outOfOrderRecvSeqNo.begin (), inOrderSeqNo);
231 ////////////////////////////////////////////////////////////////////////////
232
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800233 _LOG_TRACE ("Max in order received: " << m_maxInOrderRecvSeqNo << ", max seqNo to request: " << m_maxSeqNo);
234
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800235 if (m_maxInOrderRecvSeqNo == m_maxSeqNo)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800236 {
Alexander Afanasyevbc8bf232013-01-29 11:19:17 -0800237 _LOG_TRACE ("Fetch finished: " << m_name);
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800238 m_active = false;
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800239 // invoke callback
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800240 if (!m_finishCallback.empty ())
241 {
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800242 _LOG_TRACE ("Notifying callback");
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800243 m_finishCallback(m_deviceName, m_name);
244 }
245
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800246 // tell FetchManager that we have finish our job
Zhenkai Zhu1f1def52013-01-29 00:01:03 -0800247 // m_onFetchComplete (*this);
248 // using executor, so we won't be deleted if there is scheduled FillPipeline call
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800249 if (!m_onFetchComplete.empty ())
250 {
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800251 m_timedwait = true;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800252 m_executor->execute (bind (m_onFetchComplete, ref(*this), m_deviceName, m_name));
253 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800254 }
255 else
256 {
Zhenkai Zhuab9215c2013-01-28 23:42:28 -0800257 m_executor->execute (bind (&Fetcher::FillPipeline, this));
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800258 }
259}
260
Zhenkai Zhuff4fa8a2013-01-28 22:02:40 -0800261void
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700262Fetcher::OnTimeout (uint64_t seqno, const Ndnx::Name &name, const Closure &closure, Selectors selectors)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800263{
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800264 _LOG_DEBUG (this << ", " << m_executor.get ());
Zhenkai Zhuab9215c2013-01-28 23:42:28 -0800265 m_executor->execute (bind (&Fetcher::OnTimeout_Execute, this, seqno, name, closure, selectors));
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800266}
267
268void
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700269Fetcher::OnTimeout_Execute (uint64_t seqno, Ndnx::Name name, Ndnx::Closure closure, Ndnx::Selectors selectors)
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800270{
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800271 _LOG_DEBUG (" <<< :( timeout " << name.getPartialName (0, name.size () - 1) << ", seq = " << seqno);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800272
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800273 // cout << "Fetcher::OnTimeout: " << name << endl;
274 // cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
275 // << ", now: " << date_time::second_clock<boost::posix_time::ptime>::universal_time()
276 // << ", oldest: " << (date_time::second_clock<boost::posix_time::ptime>::universal_time() - m_maximumNoActivityPeriod) << endl;
277
278 if (m_lastPositiveActivity <
279 (date_time::second_clock<boost::posix_time::ptime>::universal_time() - m_maximumNoActivityPeriod))
280 {
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800281 bool done = false;
282 {
283 unique_lock<mutex> lock (m_seqNoMutex);
284 m_inActivePipeline.erase (seqno);
285 m_activePipeline --;
286
287 if (m_activePipeline == 0)
288 {
289 done = true;
290 }
291 }
292
293 if (done)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800294 {
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800295 {
296 unique_lock<mutex> lock (m_seqNoMutex);
297 _LOG_DEBUG ("Telling that fetch failed");
298 _LOG_DEBUG ("Active pipeline size should be zero: " << m_inActivePipeline.size ());
299 }
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800300
Alexander Afanasyev650ba282013-01-20 20:14:06 -0800301 m_active = false;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800302 if (!m_onFetchFailed.empty ())
303 {
304 m_onFetchFailed (ref (*this));
305 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800306 // this is not valid anymore, but we still should be able finish work
307 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800308 }
309 else
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800310 {
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800311 _LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700312 {
313 unique_lock<mutex> lock (m_rtoMutex);
314 _LOG_DEBUG ("Interest Timeout");
315 m_rto = m_rto * 2;
316 if(m_rto > m_maxRto)
317 {
318 m_rto = m_maxRto;
319 _LOG_DEBUG ("RTO is max: " << m_rto);
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700320 }
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700321 else
322 {
323 _LOG_DEBUG ("RTO is doubled: " << m_rto);
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700324}
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700325 }
326
327 {
328 unique_lock<mutex> lock (m_pipelineMutex);
329 m_threshold = m_pipeline / 2;
330 m_pipeline = 1;
331 m_roundCount = 0;
332 m_slowStart = true;
333 }
334
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -0700335 m_ndnx->sendInterest (name, closure, selectors.interestLifetime (m_rto));
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800336 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800337}