blob: 0bc4e127ed7675c22eb386690497fc0f54f39b57 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
Weiqi Shi098f91c2014-07-23 17:41:35 -070023using namespace ndn::time;
Shuo Chen29c77fe2014-03-18 11:29:41 -070024
25static const int RETRY_TIMEOUT = 3;
26static const int DEFAULT_CREDIT = 12;
Weiqi Shi098f91c2014-07-23 17:41:35 -070027static const milliseconds NOEND_TIMEOUT(10000);
28static const milliseconds PROCESS_DELETE_TIME(10000);
29static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
Shuo Chen29c77fe2014-03-18 11:29:41 -070030
Weiqi Shif0330d52014-07-09 10:54:27 -070031WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
32 Scheduler& scheduler,// RepoStorage& storeindex,
33 ValidatorConfig& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070034 : BaseHandle(face, storageHandle, keyChain, scheduler)
35 , m_validator(validator)
36 , m_retryTime(RETRY_TIMEOUT)
37 , m_credit(DEFAULT_CREDIT)
38 , m_noEndTimeout(NOEND_TIMEOUT)
Weiqi Shi098f91c2014-07-23 17:41:35 -070039 , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
Shuo Chen29c77fe2014-03-18 11:29:41 -070040{
41}
42
43void
44WriteHandle::deleteProcess(ProcessId processId)
45{
46 m_processes.erase(processId);
47}
48
49// Interest.
50void
51WriteHandle::onInterest(const Name& prefix, const Interest& interest)
52{
53 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080054 bind(&WriteHandle::onValidated, this, _1, prefix),
55 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070056}
57
58// onRegisterFailed.
59void
60WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
61{
Shuo Chenc88c87d2014-06-25 20:21:02 +080062 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070063 throw Error("Insert prefix registration failed");
64}
65
66// onRegisterFailed for insert.
67void
68WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
69{
Shuo Chenc88c87d2014-06-25 20:21:02 +080070 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070071 throw Error("Insert check prefix registration failed");
72}
73
74void
75WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 //m_validResult = 1;
78 RepoCommandParameter parameter;
79 try {
80 extractParameter(*interest, prefix, parameter);
81 }
82 catch (RepoCommandParameter::Error) {
83 negativeReply(*interest, 403);
84 return;
85 }
86
87 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
88 if (parameter.hasSelectors()) {
89 negativeReply(*interest, 402);
90 return;
91 }
92 processSegmentedInsertCommand(*interest, parameter);
93 }
94 else {
95 processSingleInsertCommand(*interest, parameter);
96 }
Weiqi Shi098f91c2014-07-23 17:41:35 -070097 if (parameter.hasInterestLifetime())
98 m_interestLifetime = parameter.getInterestLifetime();
Shuo Chen29c77fe2014-03-18 11:29:41 -070099}
100
101void
Shuo Chen028dcd32014-06-21 16:36:44 +0800102WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700103{
Shuo Chen028dcd32014-06-21 16:36:44 +0800104 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700105 negativeReply(*interest, 401);
106}
107
108void
109WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
110{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800111 m_validator.validate(data,
112 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
113 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
114}
115
116void
117WriteHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
118 ProcessId processId)
119{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700120 if (m_processes.count(processId) == 0) {
121 return;
122 }
123
124 ProcessInfo& process = m_processes[processId];
125 RepoCommandResponse& response = process.response;
126
127 if (response.getInsertNum() == 0) {
Shuo Chenc88c87d2014-06-25 20:21:02 +0800128 getStorageHandle().insertData(*data);
Weiqi Shif0330d52014-07-09 10:54:27 -0700129 // getStorageHandle().insertEntry(*data);
130 // getStoreIndex().insert(*data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700131 response.setInsertNum(1);
132 }
133
134 deferredDeleteProcess(processId);
135}
136
137void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800138WriteHandle::onDataValidationFailed(const shared_ptr<const Data>& data, const std::string& reason)
139{
140 std::cerr << reason << std::endl;
141}
142
143void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700144WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
145{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800146 m_validator.validate(data,
147 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
148 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
149}
150
151void
152WriteHandle::onSegmentDataValidated(const Interest& interest,
153 const shared_ptr<const Data>& data,
154 ProcessId processId)
155{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700156 if (m_processes.count(processId) == 0) {
157 return;
158 }
159 RepoCommandResponse& response = m_processes[processId].response;
160
161 //refresh endBlockId
Shuo Chenc88c87d2014-06-25 20:21:02 +0800162 Name::Component finalBlockId = data->getFinalBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700163
164 if (!finalBlockId.empty()) {
165 SegmentNo final = finalBlockId.toSegment();
166 if (response.hasEndBlockId()) {
167 if (final < response.getEndBlockId()) {
168 response.setEndBlockId(final);
169 }
170 }
171 else {
172 response.setEndBlockId(final);
173 }
174 }
175
176 //insert data
Shuo Chenc88c87d2014-06-25 20:21:02 +0800177 if (getStorageHandle().insertData(*data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700178 response.setInsertNum(response.getInsertNum() + 1);
179 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700180
181 onSegmentDataControl(processId, interest);
182}
183
184void
185WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
186{
Shuo Chen028dcd32014-06-21 16:36:44 +0800187 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700188 m_processes.erase(processId);
189}
190
191void
192WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
193{
Shuo Chen028dcd32014-06-21 16:36:44 +0800194 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700195
196 onSegmentTimeoutControl(processId, interest);
197}
198
199void
200WriteHandle::listen(const Name& prefix)
201{
202 Name insertPrefix;
203 insertPrefix.append(prefix).append("insert");
Weiqi Shif0330d52014-07-09 10:54:27 -0700204 ndn::InterestFilter filter_insert(insertPrefix);
205 getFace().setInterestFilter(filter_insert,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700206 bind(&WriteHandle::onInterest, this, _1, _2),
207 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
208 Name insertCheckPrefix;
209 insertCheckPrefix.append(prefix).append("insert check");
Weiqi Shif0330d52014-07-09 10:54:27 -0700210 ndn::InterestFilter filter_insertCheck(insertCheckPrefix);
211 getFace().setInterestFilter(filter_insertCheck,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700212 bind(&WriteHandle::onCheckInterest, this, _1, _2),
213 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
214}
215
216void
217WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
218{
219 ProcessInfo& process = m_processes[processId];
220 process.credit = 0;
221
222 map<SegmentNo, int>& processRetry = process.retryCounts;
223
224 Name name = parameter.getName();
225 SegmentNo startBlockId = parameter.getStartBlockId();
226
227 uint64_t initialCredit = m_credit;
228
229 if (parameter.hasEndBlockId()) {
230 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800231 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700232 }
233 else {
234 // set noEndTimeout timer
235 process.noEndTime = ndn::time::steady_clock::now() +
236 m_noEndTimeout;
237 }
238 process.credit = initialCredit;
239 SegmentNo segment = startBlockId;
Weiqi Shi098f91c2014-07-23 17:41:35 -0700240
Shuo Chen29c77fe2014-03-18 11:29:41 -0700241 for (; segment < startBlockId + initialCredit; ++segment) {
242 Name fetchName = name;
243 fetchName.appendSegment(segment);
244 Interest interest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700245 interest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700246 getFace().expressInterest(interest,
247 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
248 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
249 process.credit--;
250 processRetry[segment] = 0;
251 }
252
253 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
254
Shuo Chen29c77fe2014-03-18 11:29:41 -0700255 process.nextSegment = segment;
256 nextSegmentQueue.push(segment);
257}
258
259void
260WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
261{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700262 if (m_processes.count(processId) == 0) {
263 return;
264 }
265 ProcessInfo& process = m_processes[processId];
266 RepoCommandResponse& response = process.response;
267 int& processCredit = process.credit;
268 //onSegmentDataControl is called when a data returns.
269 //When data returns, processCredit++
270 processCredit++;
271 SegmentNo& nextSegment = process.nextSegment;
272 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
273 map<SegmentNo, int>& retryCounts = process.retryCounts;
274
275 //read whether notime timeout
276 if (!response.hasEndBlockId()) {
277
278 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
279 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
280
281 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800282 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700283 //m_processes.erase(processId);
284 //StatusCode should be refreshed as 405
285 response.setStatusCode(405);
286 //schedule a delete event
287 deferredDeleteProcess(processId);
288 return;
289 }
290 }
291
292 //read whether this process has total ends, if ends, remove control info from the maps
293 if (response.hasEndBlockId()) {
294 uint64_t nSegments =
295 response.getEndBlockId() - response.getStartBlockId() + 1;
296 if (response.getInsertNum() >= nSegments) {
297 //m_processes.erase(processId);
298 //All the data has been inserted, StatusCode is refreshed as 200
299 response.setStatusCode(200);
300 deferredDeleteProcess(processId);
301 return;
302 }
303 }
304
305 //check whether there is any credit
306 if (processCredit == 0)
307 return;
308
309
310 //check whether sent queue empty
311 if (nextSegmentQueue.empty()) {
312 //do not do anything
313 return;
314 }
315
316 //pop the queue
317 SegmentNo sendingSegment = nextSegmentQueue.front();
318 nextSegmentQueue.pop();
319
320 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800321 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700322 //do not do anything
323 return;
324 }
325
326 //read whether this is retransmitted data;
327 SegmentNo fetchedSegment =
328 interest.getName().get(interest.getName().size() - 1).toSegment();
329
330 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
331
332 //find this fetched data, remove it from this map
333 //rit->second.erase(oit);
334 retryCounts.erase(fetchedSegment);
335 //express the interest of the top of the queue
336 Name fetchName(interest.getName().getPrefix(-1));
337 fetchName.appendSegment(sendingSegment);
338 Interest fetchInterest(fetchName);
Weiqi Shi098f91c2014-07-23 17:41:35 -0700339 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700340 getFace().expressInterest(fetchInterest,
341 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
342 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
343 //When an interest is expressed, processCredit--
344 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700345 if (retryCounts.count(sendingSegment) == 0) {
346 //not found
347 retryCounts[sendingSegment] = 0;
348 }
349 else {
350 //found
351 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
352 }
353 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800354 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700355 nextSegment++;
356 nextSegmentQueue.push(nextSegment);
357 }
358}
359
360void
361WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
362{
363 if (m_processes.count(processId) == 0) {
364 return;
365 }
366 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700367 // RepoCommandResponse& response = process.response;
368 // SegmentNo& nextSegment = process.nextSegment;
369 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700370 map<SegmentNo, int>& retryCounts = process.retryCounts;
371
372 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
373
Shuo Chen028dcd32014-06-21 16:36:44 +0800374 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700375
376 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
377
378 //read the retry time. If retry out of time, fail the process. if not, plus
379 int& retryTime = retryCounts[timeoutSegment];
380 if (retryTime >= m_retryTime) {
381 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800382 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700383 m_processes.erase(processId);
384 return;
385 }
386 else {
387 //Reput it in the queue, retryTime++
388 retryTime++;
389 Interest retryInterest(interest.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700390 retryInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700391 getFace().expressInterest(retryInterest,
392 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
393 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
394 }
395
396}
397
398void
399WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
400{
401 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800402 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
403 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700404
405}
406
407void
408WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
409{
410 RepoCommandParameter parameter;
411 try {
412 extractParameter(*interest, prefix, parameter);
413 }
414 catch (RepoCommandParameter::Error) {
415 negativeReply(*interest, 403);
416 return;
417 }
418
419 if (!parameter.hasProcessId()) {
420 negativeReply(*interest, 403);
421 return;
422 }
423 //check whether this process exists
424 ProcessId processId = parameter.getProcessId();
425 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800426 std::cerr << "no such processId: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700427 negativeReply(*interest, 404);
428 return;
429 }
430
431 ProcessInfo& process = m_processes[processId];
432
433 RepoCommandResponse& response = process.response;
434
435 //Check whether it is single data fetching
436 if (!response.hasStartBlockId() &&
437 !response.hasEndBlockId()) {
438 reply(*interest, response);
439 return;
440 }
441
442 //read if noEndtimeout
443 if (!response.hasEndBlockId()) {
444 extendNoEndTime(process);
445 reply(*interest, response);
446 return;
447 }
448 else {
449 reply(*interest, response);
450 }
451}
452
453void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800454WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
455 const std::string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700456{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800457 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700458 negativeReply(*interest, 401);
459}
460
461void
462WriteHandle::deferredDeleteProcess(ProcessId processId)
463{
464 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
465 ndn::bind(&WriteHandle::deleteProcess, this, processId));
466}
467
468void
469WriteHandle::processSingleInsertCommand(const Interest& interest,
470 RepoCommandParameter& parameter)
471{
472 ProcessId processId = generateProcessId();
473
474 ProcessInfo& process = m_processes[processId];
475
476 RepoCommandResponse& response = process.response;
477 response.setStatusCode(100);
478 response.setProcessId(processId);
479 response.setInsertNum(0);
480
481 reply(interest, response);
482
483 response.setStatusCode(300);
484
485 Interest fetchInterest(parameter.getName());
Weiqi Shi098f91c2014-07-23 17:41:35 -0700486 fetchInterest.setInterestLifetime(m_interestLifetime);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700487 if (parameter.hasSelectors()) {
488 fetchInterest.setSelectors(parameter.getSelectors());
489 }
490 getFace().expressInterest(fetchInterest,
491 bind(&WriteHandle::onData, this, _1, _2, processId),
492 bind(&WriteHandle::onTimeout, this, _1, processId));
493}
494
495void
496WriteHandle::processSegmentedInsertCommand(const Interest& interest,
497 RepoCommandParameter& parameter)
498{
499 if (parameter.hasEndBlockId()) {
500 //normal fetch segment
501 if (!parameter.hasStartBlockId()) {
502 parameter.setStartBlockId(0);
503 }
504
505 SegmentNo startBlockId = parameter.getStartBlockId();
506 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700507 if (startBlockId > endBlockId) {
508 negativeReply(interest, 403);
509 return;
510 }
511
512 ProcessId processId = generateProcessId();
513 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700514 RepoCommandResponse& response = process.response;
515 response.setStatusCode(100);
516 response.setProcessId(processId);
517 response.setInsertNum(0);
518 response.setStartBlockId(startBlockId);
519 response.setEndBlockId(endBlockId);
520
521 reply(interest, response);
522
523 //300 means data fetching is in progress
524 response.setStatusCode(300);
525
526 segInit(processId, parameter);
527 }
528 else {
529 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
530 ProcessId processId = generateProcessId();
531 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700532 RepoCommandResponse& response = process.response;
533 response.setStatusCode(100);
534 response.setProcessId(processId);
535 response.setInsertNum(0);
536 response.setStartBlockId(parameter.getStartBlockId());
537 reply(interest, response);
538
539 //300 means data fetching is in progress
540 response.setStatusCode(300);
541
542 segInit(processId, parameter);
543 }
544}
545
546void
547WriteHandle::extendNoEndTime(ProcessInfo& process)
548{
549 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
550 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
551 RepoCommandResponse& response = process.response;
552 if (now > noEndTime) {
553 response.setStatusCode(405);
554 return;
555 }
556 //extends noEndTime
557 process.noEndTime =
558 ndn::time::steady_clock::now() + m_noEndTimeout;
559
560}
561
562void
563WriteHandle::negativeReply(const Interest& interest, int statusCode)
564{
565 RepoCommandResponse response;
566 response.setStatusCode(statusCode);
567 reply(interest, response);
568}
569
570} //namespace repo