blob: 16c622edaa7df63bdad6f3a8b0cc2a1146de074d [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev42290b22017-03-09 12:58:29 -08003 * Copyright (c) 2014-2017, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
Weiqi Shi098f91c2014-07-23 17:41:35 -070026static const milliseconds NOEND_TIMEOUT(10000);
27static const milliseconds PROCESS_DELETE_TIME(10000);
28static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
Shuo Chen29c77fe2014-03-18 11:29:41 -070029
Weiqi Shif0330d52014-07-09 10:54:27 -070030WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
31 Scheduler& scheduler,// RepoStorage& storeindex,
32 ValidatorConfig& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070033 : BaseHandle(face, storageHandle, keyChain, scheduler)
34 , m_validator(validator)
35 , m_retryTime(RETRY_TIMEOUT)
36 , m_credit(DEFAULT_CREDIT)
37 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070038 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070039{
40}
41
42void
43WriteHandle::deleteProcess(ProcessId processId)
44{
45 m_processes.erase(processId);
46}
47
48// Interest.
49void
50WriteHandle::onInterest(const Name& prefix, const Interest& interest)
51{
52 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080053 bind(&WriteHandle::onValidated, this, _1, prefix),
54 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070055}
56
57// onRegisterFailed.
58void
59WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
60{
Shuo Chenc88c87d2014-06-25 20:21:02 +080061 std::cerr << reason << std::endl;
Alexander Afanasyev42290b22017-03-09 12:58:29 -080062 BOOST_THROW_EXCEPTION(Error("Insert prefix registration failed"));
Shuo Chen29c77fe2014-03-18 11:29:41 -070063}
64
65// onRegisterFailed for insert.
66void
67WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
68{
Shuo Chenc88c87d2014-06-25 20:21:02 +080069 std::cerr << reason << std::endl;
Alexander Afanasyev42290b22017-03-09 12:58:29 -080070 BOOST_THROW_EXCEPTION(Error("Insert check prefix registration failed"));
Shuo Chen29c77fe2014-03-18 11:29:41 -070071}
72
73void
Wentao Shanga8f3c402014-10-30 14:03:27 -070074WriteHandle::onValidated(const std::shared_ptr<const Interest>& interest, const Name& prefix)
Shuo Chen29c77fe2014-03-18 11:29:41 -070075{
76 //m_validResult = 1;
77 RepoCommandParameter parameter;
78 try {
79 extractParameter(*interest, prefix, parameter);
80 }
81 catch (RepoCommandParameter::Error) {
82 negativeReply(*interest, 403);
83 return;
84 }
85
86 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
87 if (parameter.hasSelectors()) {
88 negativeReply(*interest, 402);
89 return;
90 }
91 processSegmentedInsertCommand(*interest, parameter);
92 }
93 else {
94 processSingleInsertCommand(*interest, parameter);
95 }
Weiqi Shi098f91c2014-07-23 17:41:35 -070096 if (parameter.hasInterestLifetime())
97 m_interestLifetime = parameter.getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070098}
99
100void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700101WriteHandle::onValidationFailed(const std::shared_ptr<const Interest>& interest,
102 const std::string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700103{
Shuo Chen028dcd32014-06-21 16:36:44 +0800104 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700105 negativeReply(*interest, 401);
106}
107
108void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800109WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700110{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800111 m_validator.validate(data,
112 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
113 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
114}
115
116void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700117WriteHandle::onDataValidated(const Interest& interest,
118 const std::shared_ptr<const Data>& data,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800119 ProcessId processId)
120{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700121 if (m_processes.count(processId) == 0) {
122 return;
123 }
124
125 ProcessInfo& process = m_processes[processId];
126 RepoCommandResponse& response = process.response;
127
128 if (response.getInsertNum() == 0) {
Shuo Chenc88c87d2014-06-25 20:21:02 +0800129 getStorageHandle().insertData(*data);
Weiqi Shif0330d52014-07-09 10:54:27 -0700130 // getStorageHandle().insertEntry(*data);
131 // getStoreIndex().insert(*data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700132 response.setInsertNum(1);
133 }
134
135 deferredDeleteProcess(processId);
136}
137
138void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700139WriteHandle::onDataValidationFailed(const std::shared_ptr<const Data>& data,
140 const std::string& reason)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800141{
142 std::cerr << reason << std::endl;
143}
144
145void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800146WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700147{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800148 m_validator.validate(data,
149 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
150 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
151}
152
153void
154WriteHandle::onSegmentDataValidated(const Interest& interest,
Wentao Shanga8f3c402014-10-30 14:03:27 -0700155 const std::shared_ptr<const Data>& data,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800156 ProcessId processId)
157{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700158 if (m_processes.count(processId) == 0) {
159 return;
160 }
161 RepoCommandResponse& response = m_processes[processId].response;
162
163 //refresh endBlockId
Shuo Chenc88c87d2014-06-25 20:21:02 +0800164 Name::Component finalBlockId = data->getFinalBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700165
166 if (!finalBlockId.empty()) {
167 SegmentNo final = finalBlockId.toSegment();
168 if (response.hasEndBlockId()) {
169 if (final < response.getEndBlockId()) {
170 response.setEndBlockId(final);
171 }
172 }
173 else {
174 response.setEndBlockId(final);
175 }
176 }
177
178 //insert data
Shuo Chenc88c87d2014-06-25 20:21:02 +0800179 if (getStorageHandle().insertData(*data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700180 response.setInsertNum(response.getInsertNum() + 1);
181 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700182
183 onSegmentDataControl(processId, interest);
184}
185
186void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700187WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700188{
Shuo Chen028dcd32014-06-21 16:36:44 +0800189 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700190 m_processes.erase(processId);
191}
192
193void
194WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
195{
Shuo Chen028dcd32014-06-21 16:36:44 +0800196 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700197
198 onSegmentTimeoutControl(processId, interest);
199}
200
201void
202WriteHandle::listen(const Name& prefix)
203{
204 Name insertPrefix;
205 insertPrefix.append(prefix).append("insert");
Weiqi Shif0330d52014-07-09 10:54:27 -0700206 ndn::InterestFilter filter_insert(insertPrefix);
207 getFace().setInterestFilter(filter_insert,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700208 bind(&WriteHandle::onInterest, this, _1, _2),
209 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
210 Name insertCheckPrefix;
211 insertCheckPrefix.append(prefix).append("insert check");
Weiqi Shif0330d52014-07-09 10:54:27 -0700212 ndn::InterestFilter filter_insertCheck(insertCheckPrefix);
213 getFace().setInterestFilter(filter_insertCheck,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700214 bind(&WriteHandle::onCheckInterest, this, _1, _2),
215 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
216}
217
218void
219WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
220{
221 ProcessInfo& process = m_processes[processId];
222 process.credit = 0;
223
224 map<SegmentNo, int>& processRetry = process.retryCounts;
225
226 Name name = parameter.getName();
227 SegmentNo startBlockId = parameter.getStartBlockId();
228
229 uint64_t initialCredit = m_credit;
230
231 if (parameter.hasEndBlockId()) {
232 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800233 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700234 }
235 else {
236 // set noEndTimeout timer
237 process.noEndTime = ndn::time::steady_clock::now() +
238 m_noEndTimeout;
239 }
240 process.credit = initialCredit;
241 SegmentNo segment = startBlockId;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700242
Shuo Chen29c77fe2014-03-18 11:29:41 -0700243 for (; segment < startBlockId + initialCredit; ++segment) {
244 Name fetchName = name;
245 fetchName.appendSegment(segment);
246 Interest interest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700247 interest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700248 getFace().expressInterest(interest,
249 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800250 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700251 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
252 process.credit--;
253 processRetry[segment] = 0;
254 }
255
256 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
257
Shuo Chen29c77fe2014-03-18 11:29:41 -0700258 process.nextSegment = segment;
259 nextSegmentQueue.push(segment);
260}
261
262void
263WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
264{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700265 if (m_processes.count(processId) == 0) {
266 return;
267 }
268 ProcessInfo& process = m_processes[processId];
269 RepoCommandResponse& response = process.response;
270 int& processCredit = process.credit;
271 //onSegmentDataControl is called when a data returns.
272 //When data returns, processCredit++
273 processCredit++;
274 SegmentNo& nextSegment = process.nextSegment;
275 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
276 map<SegmentNo, int>& retryCounts = process.retryCounts;
277
278 //read whether notime timeout
279 if (!response.hasEndBlockId()) {
280
281 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
282 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
283
284 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800285 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700286 //m_processes.erase(processId);
287 //StatusCode should be refreshed as 405
288 response.setStatusCode(405);
289 //schedule a delete event
290 deferredDeleteProcess(processId);
291 return;
292 }
293 }
294
295 //read whether this process has total ends, if ends, remove control info from the maps
296 if (response.hasEndBlockId()) {
297 uint64_t nSegments =
298 response.getEndBlockId() - response.getStartBlockId() + 1;
299 if (response.getInsertNum() >= nSegments) {
300 //m_processes.erase(processId);
301 //All the data has been inserted, StatusCode is refreshed as 200
302 response.setStatusCode(200);
303 deferredDeleteProcess(processId);
304 return;
305 }
306 }
307
308 //check whether there is any credit
309 if (processCredit == 0)
310 return;
311
312
313 //check whether sent queue empty
314 if (nextSegmentQueue.empty()) {
315 //do not do anything
316 return;
317 }
318
319 //pop the queue
320 SegmentNo sendingSegment = nextSegmentQueue.front();
321 nextSegmentQueue.pop();
322
323 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800324 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700325 //do not do anything
326 return;
327 }
328
329 //read whether this is retransmitted data;
330 SegmentNo fetchedSegment =
331 interest.getName().get(interest.getName().size() - 1).toSegment();
332
333 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
334
335 //find this fetched data, remove it from this map
336 //rit->second.erase(oit);
337 retryCounts.erase(fetchedSegment);
338 //express the interest of the top of the queue
339 Name fetchName(interest.getName().getPrefix(-1));
340 fetchName.appendSegment(sendingSegment);
341 Interest fetchInterest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700342 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700343 getFace().expressInterest(fetchInterest,
344 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800345 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700346 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
347 //When an interest is expressed, processCredit--
348 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700349 if (retryCounts.count(sendingSegment) == 0) {
350 //not found
351 retryCounts[sendingSegment] = 0;
352 }
353 else {
354 //found
355 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
356 }
357 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800358 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700359 nextSegment++;
360 nextSegmentQueue.push(nextSegment);
361 }
362}
363
364void
365WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
366{
367 if (m_processes.count(processId) == 0) {
368 return;
369 }
370 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700371 // RepoCommandResponse& response = process.response;
372 // SegmentNo& nextSegment = process.nextSegment;
373 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700374 map<SegmentNo, int>& retryCounts = process.retryCounts;
375
376 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
377
Shuo Chen028dcd32014-06-21 16:36:44 +0800378 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700379
380 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
381
382 //read the retry time. If retry out of time, fail the process. if not, plus
383 int& retryTime = retryCounts[timeoutSegment];
384 if (retryTime >= m_retryTime) {
385 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800386 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700387 m_processes.erase(processId);
388 return;
389 }
390 else {
391 //Reput it in the queue, retryTime++
392 retryTime++;
393 Interest retryInterest(interest.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700394 retryInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700395 getFace().expressInterest(retryInterest,
396 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800397 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700398 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
399 }
400
401}
402
403void
404WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
405{
406 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800407 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
408 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700409
410}
411
412void
413WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
414{
415 RepoCommandParameter parameter;
416 try {
417 extractParameter(*interest, prefix, parameter);
418 }
419 catch (RepoCommandParameter::Error) {
420 negativeReply(*interest, 403);
421 return;
422 }
423
424 if (!parameter.hasProcessId()) {
425 negativeReply(*interest, 403);
426 return;
427 }
428 //check whether this process exists
429 ProcessId processId = parameter.getProcessId();
430 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800431 std::cerr << "no such processId: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700432 negativeReply(*interest, 404);
433 return;
434 }
435
436 ProcessInfo& process = m_processes[processId];
437
438 RepoCommandResponse& response = process.response;
439
440 //Check whether it is single data fetching
441 if (!response.hasStartBlockId() &&
442 !response.hasEndBlockId()) {
443 reply(*interest, response);
444 return;
445 }
446
447 //read if noEndtimeout
448 if (!response.hasEndBlockId()) {
449 extendNoEndTime(process);
450 reply(*interest, response);
451 return;
452 }
453 else {
454 reply(*interest, response);
455 }
456}
457
458void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800459WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
460 const std::string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700461{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800462 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700463 negativeReply(*interest, 401);
464}
465
466void
467WriteHandle::deferredDeleteProcess(ProcessId processId)
468{
469 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800470 bind(&WriteHandle::deleteProcess, this, processId));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700471}
472
473void
474WriteHandle::processSingleInsertCommand(const Interest& interest,
475 RepoCommandParameter& parameter)
476{
477 ProcessId processId = generateProcessId();
478
479 ProcessInfo& process = m_processes[processId];
480
481 RepoCommandResponse& response = process.response;
482 response.setStatusCode(100);
483 response.setProcessId(processId);
484 response.setInsertNum(0);
485
486 reply(interest, response);
487
488 response.setStatusCode(300);
489
490 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700491 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700492 if (parameter.hasSelectors()) {
493 fetchInterest.setSelectors(parameter.getSelectors());
494 }
495 getFace().expressInterest(fetchInterest,
496 bind(&WriteHandle::onData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800497 bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700498 bind(&WriteHandle::onTimeout, this, _1, processId));
499}
500
501void
502WriteHandle::processSegmentedInsertCommand(const Interest& interest,
503 RepoCommandParameter& parameter)
504{
505 if (parameter.hasEndBlockId()) {
506 //normal fetch segment
507 if (!parameter.hasStartBlockId()) {
508 parameter.setStartBlockId(0);
509 }
510
511 SegmentNo startBlockId = parameter.getStartBlockId();
512 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700513 if (startBlockId > endBlockId) {
514 negativeReply(interest, 403);
515 return;
516 }
517
518 ProcessId processId = generateProcessId();
519 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700520 RepoCommandResponse& response = process.response;
521 response.setStatusCode(100);
522 response.setProcessId(processId);
523 response.setInsertNum(0);
524 response.setStartBlockId(startBlockId);
525 response.setEndBlockId(endBlockId);
526
527 reply(interest, response);
528
529 //300 means data fetching is in progress
530 response.setStatusCode(300);
531
532 segInit(processId, parameter);
533 }
534 else {
535 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
536 ProcessId processId = generateProcessId();
537 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700538 RepoCommandResponse& response = process.response;
539 response.setStatusCode(100);
540 response.setProcessId(processId);
541 response.setInsertNum(0);
542 response.setStartBlockId(parameter.getStartBlockId());
543 reply(interest, response);
544
545 //300 means data fetching is in progress
546 response.setStatusCode(300);
547
548 segInit(processId, parameter);
549 }
550}
551
552void
553WriteHandle::extendNoEndTime(ProcessInfo& process)
554{
555 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
556 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
557 RepoCommandResponse& response = process.response;
558 if (now > noEndTime) {
559 response.setStatusCode(405);
560 return;
561 }
562 //extends noEndTime
563 process.noEndTime =
564 ndn::time::steady_clock::now() + m_noEndTimeout;
565
566}
567
568void
569WriteHandle::negativeReply(const Interest& interest, int statusCode)
570{
571 RepoCommandResponse response;
572 response.setStatusCode(statusCode);
573 reply(interest, response);
574}
575
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800576} // namespace repo