blob: 4586badc46a87ab357a1486ab8973a9514ce3102 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Alexander Afanasyev42290b22017-03-09 12:58:29 -08003 * Copyright (c) 2014-2017, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
Weiqi Shi098f91c2014-07-23 17:41:35 -070026static const milliseconds NOEND_TIMEOUT(10000);
27static const milliseconds PROCESS_DELETE_TIME(10000);
28static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
Shuo Chen29c77fe2014-03-18 11:29:41 -070029
Weiqi Shif0330d52014-07-09 10:54:27 -070030WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
Junxiao Shi047a6fb2017-06-08 16:16:05 +000031 Scheduler& scheduler,
32 Validator& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070033 : BaseHandle(face, storageHandle, keyChain, scheduler)
34 , m_validator(validator)
35 , m_retryTime(RETRY_TIMEOUT)
36 , m_credit(DEFAULT_CREDIT)
37 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070038 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070039{
40}
41
42void
43WriteHandle::deleteProcess(ProcessId processId)
44{
45 m_processes.erase(processId);
46}
47
48// Interest.
49void
50WriteHandle::onInterest(const Name& prefix, const Interest& interest)
51{
52 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080053 bind(&WriteHandle::onValidated, this, _1, prefix),
54 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070055}
56
Shuo Chen29c77fe2014-03-18 11:29:41 -070057void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040058WriteHandle::onValidated(const Interest& interest, const Name& prefix)
Shuo Chen29c77fe2014-03-18 11:29:41 -070059{
60 //m_validResult = 1;
61 RepoCommandParameter parameter;
62 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040063 extractParameter(interest, prefix, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070064 }
65 catch (RepoCommandParameter::Error) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040066 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -070067 return;
68 }
69
70 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
71 if (parameter.hasSelectors()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040072 negativeReply(interest, 402);
Shuo Chen29c77fe2014-03-18 11:29:41 -070073 return;
74 }
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040075 processSegmentedInsertCommand(interest, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070076 }
77 else {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040078 processSingleInsertCommand(interest, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070079 }
Weiqi Shi098f91c2014-07-23 17:41:35 -070080 if (parameter.hasInterestLifetime())
81 m_interestLifetime = parameter.getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070082}
83
84void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040085WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
Shuo Chen29c77fe2014-03-18 11:29:41 -070086{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040087 std::cerr << error << std::endl;
88 negativeReply(interest, 401);
Shuo Chen29c77fe2014-03-18 11:29:41 -070089}
90
91void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080092WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -070093{
Shuo Chenc88c87d2014-06-25 20:21:02 +080094 m_validator.validate(data,
95 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
96 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
97}
98
99void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400100WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800101{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700102 if (m_processes.count(processId) == 0) {
103 return;
104 }
105
106 ProcessInfo& process = m_processes[processId];
107 RepoCommandResponse& response = process.response;
108
109 if (response.getInsertNum() == 0) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400110 getStorageHandle().insertData(data);
111 // getStorageHandle().insertEntry(data);
112 // getStoreIndex().insert(data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700113 response.setInsertNum(1);
114 }
115
116 deferredDeleteProcess(processId);
117}
118
119void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400120WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800121{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400122 std::cerr << error << std::endl;
Shuo Chenc88c87d2014-06-25 20:21:02 +0800123}
124
125void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800126WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700127{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800128 m_validator.validate(data,
129 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
130 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
131}
132
133void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400134WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800135{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700136 if (m_processes.count(processId) == 0) {
137 return;
138 }
139 RepoCommandResponse& response = m_processes[processId].response;
140
141 //refresh endBlockId
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400142 Name::Component finalBlockId = data.getFinalBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700143
144 if (!finalBlockId.empty()) {
145 SegmentNo final = finalBlockId.toSegment();
146 if (response.hasEndBlockId()) {
147 if (final < response.getEndBlockId()) {
148 response.setEndBlockId(final);
149 }
150 }
151 else {
152 response.setEndBlockId(final);
153 }
154 }
155
156 //insert data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400157 if (getStorageHandle().insertData(data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700158 response.setInsertNum(response.getInsertNum() + 1);
159 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700160
161 onSegmentDataControl(processId, interest);
162}
163
164void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700165WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700166{
Shuo Chen028dcd32014-06-21 16:36:44 +0800167 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700168 m_processes.erase(processId);
169}
170
171void
172WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
173{
Shuo Chen028dcd32014-06-21 16:36:44 +0800174 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700175
176 onSegmentTimeoutControl(processId, interest);
177}
178
179void
180WriteHandle::listen(const Name& prefix)
181{
Junxiao Shi2b7b8312017-06-16 03:43:24 +0000182 getFace().setInterestFilter(Name(prefix).append("insert"),
183 bind(&WriteHandle::onInterest, this, _1, _2));
184 getFace().setInterestFilter(Name(prefix).append("insert check"),
185 bind(&WriteHandle::onCheckInterest, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700186}
187
188void
189WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
190{
191 ProcessInfo& process = m_processes[processId];
192 process.credit = 0;
193
194 map<SegmentNo, int>& processRetry = process.retryCounts;
195
196 Name name = parameter.getName();
197 SegmentNo startBlockId = parameter.getStartBlockId();
198
199 uint64_t initialCredit = m_credit;
200
201 if (parameter.hasEndBlockId()) {
202 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800203 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700204 }
205 else {
206 // set noEndTimeout timer
207 process.noEndTime = ndn::time::steady_clock::now() +
208 m_noEndTimeout;
209 }
210 process.credit = initialCredit;
211 SegmentNo segment = startBlockId;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700212
Shuo Chen29c77fe2014-03-18 11:29:41 -0700213 for (; segment < startBlockId + initialCredit; ++segment) {
214 Name fetchName = name;
215 fetchName.appendSegment(segment);
216 Interest interest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700217 interest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700218 getFace().expressInterest(interest,
219 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800220 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700221 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
222 process.credit--;
223 processRetry[segment] = 0;
224 }
225
226 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
227
Shuo Chen29c77fe2014-03-18 11:29:41 -0700228 process.nextSegment = segment;
229 nextSegmentQueue.push(segment);
230}
231
232void
233WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
234{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700235 if (m_processes.count(processId) == 0) {
236 return;
237 }
238 ProcessInfo& process = m_processes[processId];
239 RepoCommandResponse& response = process.response;
240 int& processCredit = process.credit;
241 //onSegmentDataControl is called when a data returns.
242 //When data returns, processCredit++
243 processCredit++;
244 SegmentNo& nextSegment = process.nextSegment;
245 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
246 map<SegmentNo, int>& retryCounts = process.retryCounts;
247
248 //read whether notime timeout
249 if (!response.hasEndBlockId()) {
250
251 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
252 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
253
254 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800255 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700256 //m_processes.erase(processId);
257 //StatusCode should be refreshed as 405
258 response.setStatusCode(405);
259 //schedule a delete event
260 deferredDeleteProcess(processId);
261 return;
262 }
263 }
264
265 //read whether this process has total ends, if ends, remove control info from the maps
266 if (response.hasEndBlockId()) {
267 uint64_t nSegments =
268 response.getEndBlockId() - response.getStartBlockId() + 1;
269 if (response.getInsertNum() >= nSegments) {
270 //m_processes.erase(processId);
271 //All the data has been inserted, StatusCode is refreshed as 200
272 response.setStatusCode(200);
273 deferredDeleteProcess(processId);
274 return;
275 }
276 }
277
278 //check whether there is any credit
279 if (processCredit == 0)
280 return;
281
282
283 //check whether sent queue empty
284 if (nextSegmentQueue.empty()) {
285 //do not do anything
286 return;
287 }
288
289 //pop the queue
290 SegmentNo sendingSegment = nextSegmentQueue.front();
291 nextSegmentQueue.pop();
292
293 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800294 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700295 //do not do anything
296 return;
297 }
298
299 //read whether this is retransmitted data;
300 SegmentNo fetchedSegment =
301 interest.getName().get(interest.getName().size() - 1).toSegment();
302
303 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
304
305 //find this fetched data, remove it from this map
306 //rit->second.erase(oit);
307 retryCounts.erase(fetchedSegment);
308 //express the interest of the top of the queue
309 Name fetchName(interest.getName().getPrefix(-1));
310 fetchName.appendSegment(sendingSegment);
311 Interest fetchInterest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700312 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700313 getFace().expressInterest(fetchInterest,
314 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800315 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700316 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
317 //When an interest is expressed, processCredit--
318 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700319 if (retryCounts.count(sendingSegment) == 0) {
320 //not found
321 retryCounts[sendingSegment] = 0;
322 }
323 else {
324 //found
325 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
326 }
327 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800328 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700329 nextSegment++;
330 nextSegmentQueue.push(nextSegment);
331 }
332}
333
334void
335WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
336{
337 if (m_processes.count(processId) == 0) {
338 return;
339 }
340 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700341 // RepoCommandResponse& response = process.response;
342 // SegmentNo& nextSegment = process.nextSegment;
343 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700344 map<SegmentNo, int>& retryCounts = process.retryCounts;
345
346 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
347
Shuo Chen028dcd32014-06-21 16:36:44 +0800348 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700349
350 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
351
352 //read the retry time. If retry out of time, fail the process. if not, plus
353 int& retryTime = retryCounts[timeoutSegment];
354 if (retryTime >= m_retryTime) {
355 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800356 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700357 m_processes.erase(processId);
358 return;
359 }
360 else {
361 //Reput it in the queue, retryTime++
362 retryTime++;
363 Interest retryInterest(interest.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700364 retryInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700365 getFace().expressInterest(retryInterest,
366 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800367 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700368 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
369 }
370
371}
372
373void
374WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
375{
376 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800377 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
378 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700379
380}
381
382void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400383WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700384{
385 RepoCommandParameter parameter;
386 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400387 extractParameter(interest, prefix, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700388 }
389 catch (RepoCommandParameter::Error) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400390 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700391 return;
392 }
393
394 if (!parameter.hasProcessId()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400395 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700396 return;
397 }
398 //check whether this process exists
399 ProcessId processId = parameter.getProcessId();
400 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800401 std::cerr << "no such processId: " << processId << std::endl;
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400402 negativeReply(interest, 404);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700403 return;
404 }
405
406 ProcessInfo& process = m_processes[processId];
407
408 RepoCommandResponse& response = process.response;
409
410 //Check whether it is single data fetching
411 if (!response.hasStartBlockId() &&
412 !response.hasEndBlockId()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400413 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700414 return;
415 }
416
417 //read if noEndtimeout
418 if (!response.hasEndBlockId()) {
419 extendNoEndTime(process);
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400420 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700421 return;
422 }
423 else {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400424 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700425 }
426}
427
428void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400429WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700430{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400431 std::cerr << error << std::endl;
432 negativeReply(interest, 401);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700433}
434
435void
436WriteHandle::deferredDeleteProcess(ProcessId processId)
437{
438 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800439 bind(&WriteHandle::deleteProcess, this, processId));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700440}
441
442void
443WriteHandle::processSingleInsertCommand(const Interest& interest,
444 RepoCommandParameter& parameter)
445{
446 ProcessId processId = generateProcessId();
447
448 ProcessInfo& process = m_processes[processId];
449
450 RepoCommandResponse& response = process.response;
451 response.setStatusCode(100);
452 response.setProcessId(processId);
453 response.setInsertNum(0);
454
455 reply(interest, response);
456
457 response.setStatusCode(300);
458
459 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700460 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700461 if (parameter.hasSelectors()) {
462 fetchInterest.setSelectors(parameter.getSelectors());
463 }
464 getFace().expressInterest(fetchInterest,
465 bind(&WriteHandle::onData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800466 bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700467 bind(&WriteHandle::onTimeout, this, _1, processId));
468}
469
470void
471WriteHandle::processSegmentedInsertCommand(const Interest& interest,
472 RepoCommandParameter& parameter)
473{
474 if (parameter.hasEndBlockId()) {
475 //normal fetch segment
476 if (!parameter.hasStartBlockId()) {
477 parameter.setStartBlockId(0);
478 }
479
480 SegmentNo startBlockId = parameter.getStartBlockId();
481 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700482 if (startBlockId > endBlockId) {
483 negativeReply(interest, 403);
484 return;
485 }
486
487 ProcessId processId = generateProcessId();
488 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700489 RepoCommandResponse& response = process.response;
490 response.setStatusCode(100);
491 response.setProcessId(processId);
492 response.setInsertNum(0);
493 response.setStartBlockId(startBlockId);
494 response.setEndBlockId(endBlockId);
495
496 reply(interest, response);
497
498 //300 means data fetching is in progress
499 response.setStatusCode(300);
500
501 segInit(processId, parameter);
502 }
503 else {
504 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
505 ProcessId processId = generateProcessId();
506 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700507 RepoCommandResponse& response = process.response;
508 response.setStatusCode(100);
509 response.setProcessId(processId);
510 response.setInsertNum(0);
511 response.setStartBlockId(parameter.getStartBlockId());
512 reply(interest, response);
513
514 //300 means data fetching is in progress
515 response.setStatusCode(300);
516
517 segInit(processId, parameter);
518 }
519}
520
521void
522WriteHandle::extendNoEndTime(ProcessInfo& process)
523{
524 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
525 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
526 RepoCommandResponse& response = process.response;
527 if (now > noEndTime) {
528 response.setStatusCode(405);
529 return;
530 }
531 //extends noEndTime
532 process.noEndTime =
533 ndn::time::steady_clock::now() + m_noEndTimeout;
534
535}
536
537void
538WriteHandle::negativeReply(const Interest& interest, int statusCode)
539{
540 RepoCommandResponse response;
541 response.setStatusCode(statusCode);
542 reply(interest, response);
543}
544
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800545} // namespace repo