blob: 02837280080cdf050c56cc7e363fe4f1fc524ebf [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyevc0e26582017-08-13 21:16:49 -04002/*
Davide Pesavento0c139512018-11-03 18:23:38 -04003 * Copyright (c) 2014-2018, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
Weiqi Shi098f91c2014-07-23 17:41:35 -070026static const milliseconds NOEND_TIMEOUT(10000);
27static const milliseconds PROCESS_DELETE_TIME(10000);
28static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
Shuo Chen29c77fe2014-03-18 11:29:41 -070029
Weiqi Shif0330d52014-07-09 10:54:27 -070030WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
Junxiao Shi047a6fb2017-06-08 16:16:05 +000031 Scheduler& scheduler,
32 Validator& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070033 : BaseHandle(face, storageHandle, keyChain, scheduler)
34 , m_validator(validator)
35 , m_retryTime(RETRY_TIMEOUT)
36 , m_credit(DEFAULT_CREDIT)
37 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070038 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070039{
40}
41
42void
43WriteHandle::deleteProcess(ProcessId processId)
44{
45 m_processes.erase(processId);
46}
47
48// Interest.
49void
50WriteHandle::onInterest(const Name& prefix, const Interest& interest)
51{
52 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080053 bind(&WriteHandle::onValidated, this, _1, prefix),
54 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070055}
56
Shuo Chen29c77fe2014-03-18 11:29:41 -070057void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040058WriteHandle::onValidated(const Interest& interest, const Name& prefix)
Shuo Chen29c77fe2014-03-18 11:29:41 -070059{
Shuo Chen29c77fe2014-03-18 11:29:41 -070060 RepoCommandParameter parameter;
61 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040062 extractParameter(interest, prefix, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070063 }
Davide Pesavento0c139512018-11-03 18:23:38 -040064 catch (const RepoCommandParameter::Error&) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040065 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -070066 return;
67 }
68
69 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
70 if (parameter.hasSelectors()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040071 negativeReply(interest, 402);
Shuo Chen29c77fe2014-03-18 11:29:41 -070072 return;
73 }
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040074 processSegmentedInsertCommand(interest, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070075 }
76 else {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040077 processSingleInsertCommand(interest, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -070078 }
Weiqi Shi098f91c2014-07-23 17:41:35 -070079 if (parameter.hasInterestLifetime())
80 m_interestLifetime = parameter.getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070081}
82
83void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040084WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
Shuo Chen29c77fe2014-03-18 11:29:41 -070085{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040086 std::cerr << error << std::endl;
87 negativeReply(interest, 401);
Shuo Chen29c77fe2014-03-18 11:29:41 -070088}
89
90void
Alexander Afanasyev42290b22017-03-09 12:58:29 -080091WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -070092{
Shuo Chenc88c87d2014-06-25 20:21:02 +080093 m_validator.validate(data,
94 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
95 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
96}
97
98void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -040099WriteHandle::onDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800100{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700101 if (m_processes.count(processId) == 0) {
102 return;
103 }
104
105 ProcessInfo& process = m_processes[processId];
106 RepoCommandResponse& response = process.response;
107
108 if (response.getInsertNum() == 0) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400109 getStorageHandle().insertData(data);
110 // getStorageHandle().insertEntry(data);
111 // getStoreIndex().insert(data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700112 response.setInsertNum(1);
113 }
114
115 deferredDeleteProcess(processId);
116}
117
118void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400119WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800120{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400121 std::cerr << error << std::endl;
Shuo Chenc88c87d2014-06-25 20:21:02 +0800122}
123
124void
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800125WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700126{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800127 m_validator.validate(data,
128 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
129 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
130}
131
132void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400133WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId)
Shuo Chenc88c87d2014-06-25 20:21:02 +0800134{
Davide Pesavento0c139512018-11-03 18:23:38 -0400135 auto it = m_processes.find(processId);
136 if (it == m_processes.end()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700137 return;
138 }
Davide Pesavento0c139512018-11-03 18:23:38 -0400139 RepoCommandResponse& response = it->second.response;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700140
141 //refresh endBlockId
Davide Pesavento0c139512018-11-03 18:23:38 -0400142 auto finalBlock = data.getFinalBlock();
143 if (finalBlock && finalBlock->isSegment()) {
144 auto finalSeg = finalBlock->toSegment();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700145 if (response.hasEndBlockId()) {
Davide Pesavento0c139512018-11-03 18:23:38 -0400146 if (finalSeg < response.getEndBlockId()) {
147 response.setEndBlockId(finalSeg);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700148 }
149 }
150 else {
Davide Pesavento0c139512018-11-03 18:23:38 -0400151 response.setEndBlockId(finalSeg);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700152 }
153 }
154
155 //insert data
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400156 if (getStorageHandle().insertData(data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700157 response.setInsertNum(response.getInsertNum() + 1);
158 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700159
160 onSegmentDataControl(processId, interest);
161}
162
163void
Wentao Shanga8f3c402014-10-30 14:03:27 -0700164WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700165{
Shuo Chen028dcd32014-06-21 16:36:44 +0800166 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700167 m_processes.erase(processId);
168}
169
170void
171WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
172{
Shuo Chen028dcd32014-06-21 16:36:44 +0800173 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700174
175 onSegmentTimeoutControl(processId, interest);
176}
177
178void
179WriteHandle::listen(const Name& prefix)
180{
Junxiao Shi2b7b8312017-06-16 03:43:24 +0000181 getFace().setInterestFilter(Name(prefix).append("insert"),
182 bind(&WriteHandle::onInterest, this, _1, _2));
183 getFace().setInterestFilter(Name(prefix).append("insert check"),
184 bind(&WriteHandle::onCheckInterest, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700185}
186
187void
188WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
189{
190 ProcessInfo& process = m_processes[processId];
191 process.credit = 0;
192
193 map<SegmentNo, int>& processRetry = process.retryCounts;
194
195 Name name = parameter.getName();
196 SegmentNo startBlockId = parameter.getStartBlockId();
197
198 uint64_t initialCredit = m_credit;
199
200 if (parameter.hasEndBlockId()) {
201 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800202 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700203 }
204 else {
205 // set noEndTimeout timer
206 process.noEndTime = ndn::time::steady_clock::now() +
207 m_noEndTimeout;
208 }
209 process.credit = initialCredit;
210 SegmentNo segment = startBlockId;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700211
Shuo Chen29c77fe2014-03-18 11:29:41 -0700212 for (; segment < startBlockId + initialCredit; ++segment) {
213 Name fetchName = name;
214 fetchName.appendSegment(segment);
215 Interest interest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700216 interest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700217 getFace().expressInterest(interest,
218 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800219 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700220 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
221 process.credit--;
222 processRetry[segment] = 0;
223 }
224
225 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
226
Shuo Chen29c77fe2014-03-18 11:29:41 -0700227 process.nextSegment = segment;
228 nextSegmentQueue.push(segment);
229}
230
231void
232WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
233{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700234 if (m_processes.count(processId) == 0) {
235 return;
236 }
237 ProcessInfo& process = m_processes[processId];
238 RepoCommandResponse& response = process.response;
239 int& processCredit = process.credit;
240 //onSegmentDataControl is called when a data returns.
241 //When data returns, processCredit++
242 processCredit++;
243 SegmentNo& nextSegment = process.nextSegment;
244 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
245 map<SegmentNo, int>& retryCounts = process.retryCounts;
246
247 //read whether notime timeout
248 if (!response.hasEndBlockId()) {
249
250 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
251 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
252
253 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800254 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700255 //m_processes.erase(processId);
256 //StatusCode should be refreshed as 405
257 response.setStatusCode(405);
258 //schedule a delete event
259 deferredDeleteProcess(processId);
260 return;
261 }
262 }
263
264 //read whether this process has total ends, if ends, remove control info from the maps
265 if (response.hasEndBlockId()) {
266 uint64_t nSegments =
267 response.getEndBlockId() - response.getStartBlockId() + 1;
268 if (response.getInsertNum() >= nSegments) {
269 //m_processes.erase(processId);
270 //All the data has been inserted, StatusCode is refreshed as 200
271 response.setStatusCode(200);
272 deferredDeleteProcess(processId);
273 return;
274 }
275 }
276
277 //check whether there is any credit
278 if (processCredit == 0)
279 return;
280
281
282 //check whether sent queue empty
283 if (nextSegmentQueue.empty()) {
284 //do not do anything
285 return;
286 }
287
288 //pop the queue
289 SegmentNo sendingSegment = nextSegmentQueue.front();
290 nextSegmentQueue.pop();
291
292 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800293 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700294 //do not do anything
295 return;
296 }
297
298 //read whether this is retransmitted data;
299 SegmentNo fetchedSegment =
300 interest.getName().get(interest.getName().size() - 1).toSegment();
301
302 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
303
304 //find this fetched data, remove it from this map
305 //rit->second.erase(oit);
306 retryCounts.erase(fetchedSegment);
307 //express the interest of the top of the queue
308 Name fetchName(interest.getName().getPrefix(-1));
309 fetchName.appendSegment(sendingSegment);
310 Interest fetchInterest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700311 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700312 getFace().expressInterest(fetchInterest,
313 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800314 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700315 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
316 //When an interest is expressed, processCredit--
317 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700318 if (retryCounts.count(sendingSegment) == 0) {
319 //not found
320 retryCounts[sendingSegment] = 0;
321 }
322 else {
323 //found
324 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
325 }
326 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800327 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700328 nextSegment++;
329 nextSegmentQueue.push(nextSegment);
330 }
331}
332
333void
334WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
335{
336 if (m_processes.count(processId) == 0) {
337 return;
338 }
339 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700340 // RepoCommandResponse& response = process.response;
341 // SegmentNo& nextSegment = process.nextSegment;
342 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700343 map<SegmentNo, int>& retryCounts = process.retryCounts;
344
345 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
346
Shuo Chen028dcd32014-06-21 16:36:44 +0800347 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700348
349 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
350
351 //read the retry time. If retry out of time, fail the process. if not, plus
352 int& retryTime = retryCounts[timeoutSegment];
353 if (retryTime >= m_retryTime) {
354 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800355 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700356 m_processes.erase(processId);
357 return;
358 }
359 else {
360 //Reput it in the queue, retryTime++
361 retryTime++;
362 Interest retryInterest(interest.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700363 retryInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700364 getFace().expressInterest(retryInterest,
365 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800366 bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700367 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
368 }
369
370}
371
372void
373WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
374{
375 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800376 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
377 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700378
379}
380
381void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400382WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700383{
384 RepoCommandParameter parameter;
385 try {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400386 extractParameter(interest, prefix, parameter);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700387 }
Davide Pesavento0c139512018-11-03 18:23:38 -0400388 catch (const RepoCommandParameter::Error&) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400389 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700390 return;
391 }
392
393 if (!parameter.hasProcessId()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400394 negativeReply(interest, 403);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700395 return;
396 }
397 //check whether this process exists
398 ProcessId processId = parameter.getProcessId();
399 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800400 std::cerr << "no such processId: " << processId << std::endl;
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400401 negativeReply(interest, 404);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700402 return;
403 }
404
405 ProcessInfo& process = m_processes[processId];
406
407 RepoCommandResponse& response = process.response;
408
409 //Check whether it is single data fetching
410 if (!response.hasStartBlockId() &&
411 !response.hasEndBlockId()) {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400412 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700413 return;
414 }
415
416 //read if noEndtimeout
417 if (!response.hasEndBlockId()) {
418 extendNoEndTime(process);
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400419 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700420 return;
421 }
422 else {
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400423 reply(interest, response);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700424 }
425}
426
427void
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400428WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700429{
Alexander Afanasyevc0e26582017-08-13 21:16:49 -0400430 std::cerr << error << std::endl;
431 negativeReply(interest, 401);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700432}
433
434void
435WriteHandle::deferredDeleteProcess(ProcessId processId)
436{
437 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800438 bind(&WriteHandle::deleteProcess, this, processId));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700439}
440
441void
442WriteHandle::processSingleInsertCommand(const Interest& interest,
443 RepoCommandParameter& parameter)
444{
445 ProcessId processId = generateProcessId();
446
447 ProcessInfo& process = m_processes[processId];
448
449 RepoCommandResponse& response = process.response;
450 response.setStatusCode(100);
451 response.setProcessId(processId);
452 response.setInsertNum(0);
453
454 reply(interest, response);
455
456 response.setStatusCode(300);
457
458 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700459 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700460 if (parameter.hasSelectors()) {
461 fetchInterest.setSelectors(parameter.getSelectors());
462 }
463 getFace().expressInterest(fetchInterest,
464 bind(&WriteHandle::onData, this, _1, _2, processId),
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800465 bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
Shuo Chen29c77fe2014-03-18 11:29:41 -0700466 bind(&WriteHandle::onTimeout, this, _1, processId));
467}
468
469void
470WriteHandle::processSegmentedInsertCommand(const Interest& interest,
471 RepoCommandParameter& parameter)
472{
473 if (parameter.hasEndBlockId()) {
474 //normal fetch segment
475 if (!parameter.hasStartBlockId()) {
476 parameter.setStartBlockId(0);
477 }
478
479 SegmentNo startBlockId = parameter.getStartBlockId();
480 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700481 if (startBlockId > endBlockId) {
482 negativeReply(interest, 403);
483 return;
484 }
485
486 ProcessId processId = generateProcessId();
487 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700488 RepoCommandResponse& response = process.response;
489 response.setStatusCode(100);
490 response.setProcessId(processId);
491 response.setInsertNum(0);
492 response.setStartBlockId(startBlockId);
493 response.setEndBlockId(endBlockId);
494
495 reply(interest, response);
496
497 //300 means data fetching is in progress
498 response.setStatusCode(300);
499
500 segInit(processId, parameter);
501 }
502 else {
503 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
504 ProcessId processId = generateProcessId();
505 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700506 RepoCommandResponse& response = process.response;
507 response.setStatusCode(100);
508 response.setProcessId(processId);
509 response.setInsertNum(0);
510 response.setStartBlockId(parameter.getStartBlockId());
511 reply(interest, response);
512
513 //300 means data fetching is in progress
514 response.setStatusCode(300);
515
516 segInit(processId, parameter);
517 }
518}
519
520void
521WriteHandle::extendNoEndTime(ProcessInfo& process)
522{
523 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
524 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
525 RepoCommandResponse& response = process.response;
526 if (now > noEndTime) {
527 response.setStatusCode(405);
528 return;
529 }
530 //extends noEndTime
531 process.noEndTime =
532 ndn::time::steady_clock::now() + m_noEndTimeout;
533
534}
535
536void
537WriteHandle::negativeReply(const Interest& interest, int statusCode)
538{
539 RepoCommandResponse response;
540 response.setStatusCode(statusCode);
541 reply(interest, response);
542}
543
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800544} // namespace repo