blob: 6cc9583654bd4f0d152a777af958d3e01c14528f [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Davide Pesavento0c139512018-11-03 18:23:38 -04003 * Copyright (c) 2014-2018, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
weijia yuan82cf9142018-10-21 12:25:02 -070022#include <ndn-cxx/util/random.hpp>
23
Shuo Chen29c77fe2014-03-18 11:29:41 -070024namespace repo {
25
26static const int RETRY_TIMEOUT = 3;
27static const int DEFAULT_CREDIT = 12;
weijia yuan82cf9142018-10-21 12:25:02 -070028static const milliseconds NOEND_TIMEOUT(10000_ms);
29static const milliseconds PROCESS_DELETE_TIME(10000_ms);
30static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
Shuo Chen29c77fe2014-03-18 11:29:41 -070031
weijia yuan82cf9142018-10-21 12:25:02 -070032WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
33 Scheduler& scheduler, Validator& validator)
34 : CommandBaseHandle(face, storageHandle, scheduler, validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070035 , m_validator(validator)
36 , m_retryTime(RETRY_TIMEOUT)
37 , m_credit(DEFAULT_CREDIT)
38 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070039 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070040{
weijia yuan82cf9142018-10-21 12:25:02 -070041 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
42 makeAuthorization(),
43 std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
44 std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
45
46 dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
47 makeAuthorization(),
48 std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
49 std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
Shuo Chen29c77fe2014-03-18 11:29:41 -070050}
51
52void
53WriteHandle::deleteProcess(ProcessId processId)
54{
55 m_processes.erase(processId);
56}
57
Shuo Chen29c77fe2014-03-18 11:29:41 -070058void
weijia yuan82cf9142018-10-21 12:25:02 -070059WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
60 const ndn::mgmt::ControlParameters& parameter,
61 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -070062{
weijia yuan82cf9142018-10-21 12:25:02 -070063 RepoCommandParameter* repoParameter =
64 dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(&parameter));
Shuo Chen29c77fe2014-03-18 11:29:41 -070065
weijia yuan82cf9142018-10-21 12:25:02 -070066 if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
67 if (repoParameter->hasSelectors()) {
68 done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
Shuo Chen29c77fe2014-03-18 11:29:41 -070069 return;
70 }
weijia yuan82cf9142018-10-21 12:25:02 -070071 processSegmentedInsertCommand(interest, *repoParameter, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070072 }
73 else {
weijia yuan82cf9142018-10-21 12:25:02 -070074 processSingleInsertCommand(interest, *repoParameter, done);
Shuo Chen29c77fe2014-03-18 11:29:41 -070075 }
weijia yuan82cf9142018-10-21 12:25:02 -070076 if (repoParameter->hasInterestLifetime())
77 m_interestLifetime = repoParameter->getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070078}
79
80void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080081WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -070082{
Shuo Chenc88c87d2014-06-25 20:21:02 +080083 m_validator.validate(data,
84 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
85 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
86}
87
88void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040089WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +080090{
Shuo Chen29c77fe2014-03-18 11:29:41 -070091 if (m_processes.count(processId) == 0) {
92 return;
93 }
94
95 ProcessInfo& process = m_processes[processId];
96 RepoCommandResponse& response = process.response;
97
98 if (response.getInsertNum() == 0) {
weijia yuan82cf9142018-10-21 12:25:02 -070099 storageHandle.insertData(data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700100 response.setInsertNum(1);
101 }
102
103 deferredDeleteProcess(processId);
104}
105
106void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400107WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800108{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400109 std::cerr << error << std::endl;
Shuo Chenc88c87d2014-06-25 20:21:02 +0800110}
111
112void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800113WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700114{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800115 m_validator.validate(data,
116 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
117 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
118}
119
120void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400121WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800122{
Davide Pesavento0c139512018-11-03 18:23:38 -0400123 auto it = m_processes.find(processId);
124 if (it == m_processes.end()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700125 return;
126 }
Davide Pesavento0c139512018-11-03 18:23:38 -0400127 RepoCommandResponse& response = it->second.response;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700128
129 //refresh endBlockId
Davide Pesavento0c139512018-11-03 18:23:38 -0400130 auto finalBlock = data.getFinalBlock();
131 if (finalBlock && finalBlock->isSegment()) {
132 auto finalSeg = finalBlock->toSegment();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700133 if (response.hasEndBlockId()) {
Davide Pesavento0c139512018-11-03 18:23:38 -0400134 if (finalSeg < response.getEndBlockId()) {
135 response.setEndBlockId(finalSeg);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700136 }
137 }
138 else {
Davide Pesavento0c139512018-11-03 18:23:38 -0400139 response.setEndBlockId(finalSeg);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700140 }
141 }
142
143 //insert data
weijia yuan82cf9142018-10-21 12:25:02 -0700144 if (storageHandle.insertData(data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700145 response.setInsertNum(response.getInsertNum() + 1);
146 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700147
148 onSegmentDataControl(processId, interest);
149}
150
151void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700152WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700153{
Shuo Chen028dcd32014-06-21 16:36:44 +0800154 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700155 m_processes.erase(processId);
156}
157
158void
159WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
160{
Shuo Chen028dcd32014-06-21 16:36:44 +0800161 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700162
163 onSegmentTimeoutControl(processId, interest);
164}
165
166void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700167WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
168{
169 ProcessInfo& process = m_processes[processId];
170 process.credit = 0;
171
172 map<SegmentNo, int>& processRetry = process.retryCounts;
173
174 Name name = parameter.getName();
175 SegmentNo startBlockId = parameter.getStartBlockId();
176
177 uint64_t initialCredit = m_credit;
178
179 if (parameter.hasEndBlockId()) {
180 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800181 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700182 }
183 else {
184 // set noEndTimeout timer
185 process.noEndTime = ndn::time::steady_clock::now() +
186 m_noEndTimeout;
187 }
188 process.credit = initialCredit;
189 SegmentNo segment = startBlockId;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700190
Shuo Chen29c77fe2014-03-18 11:29:41 -0700191 for (; segment < startBlockId + initialCredit; ++segment) {
192 Name fetchName = name;
193 fetchName.appendSegment(segment);
194 Interest interest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700195 interest.setInterestLifetime(m_interestLifetime);
weijia yuan82cf9142018-10-21 12:25:02 -0700196 face.expressInterest(interest,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700197 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800198 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700199 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
200 process.credit--;
201 processRetry[segment] = 0;
202 }
203
204 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
205
Shuo Chen29c77fe2014-03-18 11:29:41 -0700206 process.nextSegment = segment;
207 nextSegmentQueue.push(segment);
208}
209
210void
211WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
212{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700213 if (m_processes.count(processId) == 0) {
214 return;
215 }
216 ProcessInfo& process = m_processes[processId];
217 RepoCommandResponse& response = process.response;
218 int& processCredit = process.credit;
219 //onSegmentDataControl is called when a data returns.
220 //When data returns, processCredit++
221 processCredit++;
222 SegmentNo& nextSegment = process.nextSegment;
223 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
224 map<SegmentNo, int>& retryCounts = process.retryCounts;
225
226 //read whether notime timeout
227 if (!response.hasEndBlockId()) {
228
229 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
230 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
231
232 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800233 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700234 //m_processes.erase(processId);
235 //StatusCode should be refreshed as 405
weijia yuan82cf9142018-10-21 12:25:02 -0700236 response.setCode(405);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700237 //schedule a delete event
238 deferredDeleteProcess(processId);
239 return;
240 }
241 }
242
243 //read whether this process has total ends, if ends, remove control info from the maps
244 if (response.hasEndBlockId()) {
245 uint64_t nSegments =
246 response.getEndBlockId() - response.getStartBlockId() + 1;
247 if (response.getInsertNum() >= nSegments) {
248 //m_processes.erase(processId);
249 //All the data has been inserted, StatusCode is refreshed as 200
weijia yuan82cf9142018-10-21 12:25:02 -0700250 response.setCode(200);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700251 deferredDeleteProcess(processId);
252 return;
253 }
254 }
255
256 //check whether there is any credit
257 if (processCredit == 0)
258 return;
259
260
261 //check whether sent queue empty
262 if (nextSegmentQueue.empty()) {
263 //do not do anything
264 return;
265 }
266
267 //pop the queue
268 SegmentNo sendingSegment = nextSegmentQueue.front();
269 nextSegmentQueue.pop();
270
271 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800272 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700273 //do not do anything
274 return;
275 }
276
277 //read whether this is retransmitted data;
278 SegmentNo fetchedSegment =
279 interest.getName().get(interest.getName().size() - 1).toSegment();
280
281 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
282
283 //find this fetched data, remove it from this map
284 //rit->second.erase(oit);
285 retryCounts.erase(fetchedSegment);
286 //express the interest of the top of the queue
287 Name fetchName(interest.getName().getPrefix(-1));
288 fetchName.appendSegment(sendingSegment);
289 Interest fetchInterest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700290 fetchInterest.setInterestLifetime(m_interestLifetime);
weijia yuan82cf9142018-10-21 12:25:02 -0700291 face.expressInterest(fetchInterest,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700292 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800293 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700294 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
295 //When an interest is expressed, processCredit--
296 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700297 if (retryCounts.count(sendingSegment) == 0) {
298 //not found
299 retryCounts[sendingSegment] = 0;
300 }
301 else {
302 //found
303 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
304 }
305 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800306 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700307 nextSegment++;
308 nextSegmentQueue.push(nextSegment);
309 }
310}
311
312void
313WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
314{
315 if (m_processes.count(processId) == 0) {
316 return;
317 }
318 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700319 // RepoCommandResponse& response = process.response;
320 // SegmentNo& nextSegment = process.nextSegment;
321 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700322 map<SegmentNo, int>& retryCounts = process.retryCounts;
323
324 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
325
Shuo Chen028dcd32014-06-21 16:36:44 +0800326 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700327
328 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
329
330 //read the retry time. If retry out of time, fail the process. if not, plus
331 int& retryTime = retryCounts[timeoutSegment];
332 if (retryTime >= m_retryTime) {
333 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800334 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700335 m_processes.erase(processId);
336 return;
337 }
338 else {
339 //Reput it in the queue, retryTime++
340 retryTime++;
341 Interest retryInterest(interest.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700342 retryInterest.setInterestLifetime(m_interestLifetime);
weijia yuan82cf9142018-10-21 12:25:02 -0700343 face.expressInterest(retryInterest,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700344 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800345 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700346 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
347 }
348
349}
350
351void
weijia yuan82cf9142018-10-21 12:25:02 -0700352WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
353 const ndn::mgmt::ControlParameters& parameter,
354 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700355{
weijia yuan82cf9142018-10-21 12:25:02 -0700356 const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700357
Shuo Chen29c77fe2014-03-18 11:29:41 -0700358 //check whether this process exists
weijia yuan82cf9142018-10-21 12:25:02 -0700359 ProcessId processId = repoParameter.getProcessId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700360 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800361 std::cerr << "no such processId: " << processId << std::endl;
weijia yuan82cf9142018-10-21 12:25:02 -0700362 done(negativeReply("No such this process is in progress", 404));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700363 return;
364 }
365
366 ProcessInfo& process = m_processes[processId];
367
368 RepoCommandResponse& response = process.response;
369
370 //Check whether it is single data fetching
371 if (!response.hasStartBlockId() &&
372 !response.hasEndBlockId()) {
weijia yuan82cf9142018-10-21 12:25:02 -0700373 //reply(interest, response);
374 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700375 return;
376 }
377
378 //read if noEndtimeout
379 if (!response.hasEndBlockId()) {
380 extendNoEndTime(process);
weijia yuan82cf9142018-10-21 12:25:02 -0700381 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700382 return;
383 }
384 else {
weijia yuan82cf9142018-10-21 12:25:02 -0700385 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700386 }
387}
388
389void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700390WriteHandle::deferredDeleteProcess(ProcessId processId)
391{
weijia yuan82cf9142018-10-21 12:25:02 -0700392 scheduler.scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800393 bind(&WriteHandle::deleteProcess, this, processId));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700394}
395
396void
weijia yuan82cf9142018-10-21 12:25:02 -0700397WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
398 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700399{
weijia yuan82cf9142018-10-21 12:25:02 -0700400 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700401
402 ProcessInfo& process = m_processes[processId];
403
404 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700405 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700406 response.setProcessId(processId);
407 response.setInsertNum(0);
weijia yuan82cf9142018-10-21 12:25:02 -0700408 response.setBody(response.wireEncode());
409 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700410
weijia yuan82cf9142018-10-21 12:25:02 -0700411 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700412
413 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700414 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700415 if (parameter.hasSelectors()) {
416 fetchInterest.setSelectors(parameter.getSelectors());
417 }
weijia yuan82cf9142018-10-21 12:25:02 -0700418 face.expressInterest(fetchInterest,
419 bind(&WriteHandle::onData, this, _1, _2, processId),
420 bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
421 bind(&WriteHandle::onTimeout, this, _1, processId));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700422}
423
424void
weijia yuan82cf9142018-10-21 12:25:02 -0700425WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
426 const ndn::mgmt::CommandContinuation& done)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700427{
428 if (parameter.hasEndBlockId()) {
429 //normal fetch segment
weijia yuan82cf9142018-10-21 12:25:02 -0700430 SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700431 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700432 if (startBlockId > endBlockId) {
weijia yuan82cf9142018-10-21 12:25:02 -0700433 done(negativeReply("Malformed Command", 403));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700434 return;
435 }
436
weijia yuan82cf9142018-10-21 12:25:02 -0700437 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700438 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700439 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700440 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700441 response.setProcessId(processId);
442 response.setInsertNum(0);
443 response.setStartBlockId(startBlockId);
444 response.setEndBlockId(endBlockId);
weijia yuan82cf9142018-10-21 12:25:02 -0700445 response.setBody(response.wireEncode());
446 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700447
448 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700449 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700450
451 segInit(processId, parameter);
452 }
453 else {
454 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
weijia yuan82cf9142018-10-21 12:25:02 -0700455 ProcessId processId = ndn::random::generateWord64();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700456 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700457 RepoCommandResponse& response = process.response;
weijia yuan82cf9142018-10-21 12:25:02 -0700458 response.setCode(100);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700459 response.setProcessId(processId);
460 response.setInsertNum(0);
461 response.setStartBlockId(parameter.getStartBlockId());
weijia yuan82cf9142018-10-21 12:25:02 -0700462 response.setBody(response.wireEncode());
463 done(response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700464
465 //300 means data fetching is in progress
weijia yuan82cf9142018-10-21 12:25:02 -0700466 response.setCode(300);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700467
468 segInit(processId, parameter);
469 }
470}
471
472void
473WriteHandle::extendNoEndTime(ProcessInfo& process)
474{
475 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
476 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
477 RepoCommandResponse& response = process.response;
478 if (now > noEndTime) {
weijia yuan82cf9142018-10-21 12:25:02 -0700479 response.setCode(405);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700480 return;
481 }
482 //extends noEndTime
weijia yuan82cf9142018-10-21 12:25:02 -0700483 process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700484
485}
486
weijia yuan82cf9142018-10-21 12:25:02 -0700487RepoCommandResponse
488WriteHandle::negativeReply(std::string text, int statusCode)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700489{
weijia yuan82cf9142018-10-21 12:25:02 -0700490 RepoCommandResponse response(statusCode, text);
491 response.setBody(response.wireEncode());
492
493 return response;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700494}
495
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800496} // namespace repo