blob: 07a5664d77f2ae616ee9405611d4f43ac879d6d6 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
29WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
Shuo Chen028dcd32014-06-21 16:36:44 +080030 Scheduler& scheduler, ValidatorConfig& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070031 : BaseHandle(face, storageHandle, keyChain, scheduler)
32 , m_validator(validator)
33 , m_retryTime(RETRY_TIMEOUT)
34 , m_credit(DEFAULT_CREDIT)
35 , m_noEndTimeout(NOEND_TIMEOUT)
36{
37}
38
39void
40WriteHandle::deleteProcess(ProcessId processId)
41{
42 m_processes.erase(processId);
43}
44
45// Interest.
46void
47WriteHandle::onInterest(const Name& prefix, const Interest& interest)
48{
49 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080050 bind(&WriteHandle::onValidated, this, _1, prefix),
51 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070052}
53
Wentao Shang91fb4f22014-05-20 10:55:22 -070054void
55WriteHandle::onRegisterSuccess(const Name& prefix)
56{
57 std::cerr << "Successfully registered prefix " << prefix << std::endl;
58}
59
Shuo Chen29c77fe2014-03-18 11:29:41 -070060// onRegisterFailed.
61void
62WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
63{
Shuo Chenc88c87d2014-06-25 20:21:02 +080064 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070065 throw Error("Insert prefix registration failed");
66}
67
68// onRegisterFailed for insert.
69void
70WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
71{
Shuo Chenc88c87d2014-06-25 20:21:02 +080072 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070073 throw Error("Insert check prefix registration failed");
74}
75
76void
77WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
78{
79 //m_validResult = 1;
80 RepoCommandParameter parameter;
81 try {
82 extractParameter(*interest, prefix, parameter);
83 }
84 catch (RepoCommandParameter::Error) {
85 negativeReply(*interest, 403);
86 return;
87 }
88
89 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
90 if (parameter.hasSelectors()) {
91 negativeReply(*interest, 402);
92 return;
93 }
94 processSegmentedInsertCommand(*interest, parameter);
95 }
96 else {
97 processSingleInsertCommand(*interest, parameter);
98 }
99
100}
101
102void
Shuo Chen028dcd32014-06-21 16:36:44 +0800103WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700104{
Shuo Chen028dcd32014-06-21 16:36:44 +0800105 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700106 negativeReply(*interest, 401);
107}
108
109void
110WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
111{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800112 m_validator.validate(data,
113 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
114 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
115}
116
117void
118WriteHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
119 ProcessId processId)
120{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700121 if (m_processes.count(processId) == 0) {
122 return;
123 }
124
125 ProcessInfo& process = m_processes[processId];
126 RepoCommandResponse& response = process.response;
127
128 if (response.getInsertNum() == 0) {
Shuo Chenc88c87d2014-06-25 20:21:02 +0800129 getStorageHandle().insertData(*data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700130 response.setInsertNum(1);
131 }
132
133 deferredDeleteProcess(processId);
134}
135
136void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800137WriteHandle::onDataValidationFailed(const shared_ptr<const Data>& data, const std::string& reason)
138{
139 std::cerr << reason << std::endl;
140}
141
142void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700143WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
144{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800145 m_validator.validate(data,
146 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
147 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
148}
149
150void
151WriteHandle::onSegmentDataValidated(const Interest& interest,
152 const shared_ptr<const Data>& data,
153 ProcessId processId)
154{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700155 if (m_processes.count(processId) == 0) {
156 return;
157 }
158 RepoCommandResponse& response = m_processes[processId].response;
159
160 //refresh endBlockId
Shuo Chenc88c87d2014-06-25 20:21:02 +0800161 Name::Component finalBlockId = data->getFinalBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700162
163 if (!finalBlockId.empty()) {
164 SegmentNo final = finalBlockId.toSegment();
165 if (response.hasEndBlockId()) {
166 if (final < response.getEndBlockId()) {
167 response.setEndBlockId(final);
168 }
169 }
170 else {
171 response.setEndBlockId(final);
172 }
173 }
174
175 //insert data
Shuo Chenc88c87d2014-06-25 20:21:02 +0800176 if (getStorageHandle().insertData(*data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700177 response.setInsertNum(response.getInsertNum() + 1);
178 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700179
180 onSegmentDataControl(processId, interest);
181}
182
183void
184WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
185{
Shuo Chen028dcd32014-06-21 16:36:44 +0800186 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700187 m_processes.erase(processId);
188}
189
190void
191WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
192{
Shuo Chen028dcd32014-06-21 16:36:44 +0800193 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700194
195 onSegmentTimeoutControl(processId, interest);
196}
197
198void
199WriteHandle::listen(const Name& prefix)
200{
201 Name insertPrefix;
202 insertPrefix.append(prefix).append("insert");
203 getFace().setInterestFilter(insertPrefix,
204 bind(&WriteHandle::onInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700205 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700206 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
207 Name insertCheckPrefix;
208 insertCheckPrefix.append(prefix).append("insert check");
209 getFace().setInterestFilter(insertCheckPrefix,
210 bind(&WriteHandle::onCheckInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700211 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700212 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
213}
214
215void
216WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
217{
218 ProcessInfo& process = m_processes[processId];
219 process.credit = 0;
220
221 map<SegmentNo, int>& processRetry = process.retryCounts;
222
223 Name name = parameter.getName();
224 SegmentNo startBlockId = parameter.getStartBlockId();
225
226 uint64_t initialCredit = m_credit;
227
228 if (parameter.hasEndBlockId()) {
229 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800230 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700231 }
232 else {
233 // set noEndTimeout timer
234 process.noEndTime = ndn::time::steady_clock::now() +
235 m_noEndTimeout;
236 }
237 process.credit = initialCredit;
238 SegmentNo segment = startBlockId;
239 for (; segment < startBlockId + initialCredit; ++segment) {
240 Name fetchName = name;
241 fetchName.appendSegment(segment);
242 Interest interest(fetchName);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700243 getFace().expressInterest(interest,
244 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
245 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
246 process.credit--;
247 processRetry[segment] = 0;
248 }
249
250 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
251
Shuo Chen29c77fe2014-03-18 11:29:41 -0700252 process.nextSegment = segment;
253 nextSegmentQueue.push(segment);
254}
255
256void
257WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
258{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700259 if (m_processes.count(processId) == 0) {
260 return;
261 }
262 ProcessInfo& process = m_processes[processId];
263 RepoCommandResponse& response = process.response;
264 int& processCredit = process.credit;
265 //onSegmentDataControl is called when a data returns.
266 //When data returns, processCredit++
267 processCredit++;
268 SegmentNo& nextSegment = process.nextSegment;
269 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
270 map<SegmentNo, int>& retryCounts = process.retryCounts;
271
272 //read whether notime timeout
273 if (!response.hasEndBlockId()) {
274
275 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
276 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
277
278 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800279 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700280 //m_processes.erase(processId);
281 //StatusCode should be refreshed as 405
282 response.setStatusCode(405);
283 //schedule a delete event
284 deferredDeleteProcess(processId);
285 return;
286 }
287 }
288
289 //read whether this process has total ends, if ends, remove control info from the maps
290 if (response.hasEndBlockId()) {
291 uint64_t nSegments =
292 response.getEndBlockId() - response.getStartBlockId() + 1;
293 if (response.getInsertNum() >= nSegments) {
294 //m_processes.erase(processId);
295 //All the data has been inserted, StatusCode is refreshed as 200
296 response.setStatusCode(200);
297 deferredDeleteProcess(processId);
298 return;
299 }
300 }
301
302 //check whether there is any credit
303 if (processCredit == 0)
304 return;
305
306
307 //check whether sent queue empty
308 if (nextSegmentQueue.empty()) {
309 //do not do anything
310 return;
311 }
312
313 //pop the queue
314 SegmentNo sendingSegment = nextSegmentQueue.front();
315 nextSegmentQueue.pop();
316
317 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800318 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700319 //do not do anything
320 return;
321 }
322
323 //read whether this is retransmitted data;
324 SegmentNo fetchedSegment =
325 interest.getName().get(interest.getName().size() - 1).toSegment();
326
327 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
328
329 //find this fetched data, remove it from this map
330 //rit->second.erase(oit);
331 retryCounts.erase(fetchedSegment);
332 //express the interest of the top of the queue
333 Name fetchName(interest.getName().getPrefix(-1));
334 fetchName.appendSegment(sendingSegment);
335 Interest fetchInterest(fetchName);
336 getFace().expressInterest(fetchInterest,
337 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
338 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
339 //When an interest is expressed, processCredit--
340 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700341 if (retryCounts.count(sendingSegment) == 0) {
342 //not found
343 retryCounts[sendingSegment] = 0;
344 }
345 else {
346 //found
347 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
348 }
349 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800350 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700351 nextSegment++;
352 nextSegmentQueue.push(nextSegment);
353 }
354}
355
356void
357WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
358{
359 if (m_processes.count(processId) == 0) {
360 return;
361 }
362 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700363 // RepoCommandResponse& response = process.response;
364 // SegmentNo& nextSegment = process.nextSegment;
365 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700366 map<SegmentNo, int>& retryCounts = process.retryCounts;
367
368 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
369
Shuo Chen028dcd32014-06-21 16:36:44 +0800370 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700371
372 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
373
374 //read the retry time. If retry out of time, fail the process. if not, plus
375 int& retryTime = retryCounts[timeoutSegment];
376 if (retryTime >= m_retryTime) {
377 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800378 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700379 m_processes.erase(processId);
380 return;
381 }
382 else {
383 //Reput it in the queue, retryTime++
384 retryTime++;
385 Interest retryInterest(interest.getName());
386 getFace().expressInterest(retryInterest,
387 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
388 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
389 }
390
391}
392
393void
394WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
395{
396 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800397 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
398 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700399
400}
401
402void
403WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
404{
405 RepoCommandParameter parameter;
406 try {
407 extractParameter(*interest, prefix, parameter);
408 }
409 catch (RepoCommandParameter::Error) {
410 negativeReply(*interest, 403);
411 return;
412 }
413
414 if (!parameter.hasProcessId()) {
415 negativeReply(*interest, 403);
416 return;
417 }
418 //check whether this process exists
419 ProcessId processId = parameter.getProcessId();
420 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800421 std::cerr << "no such processId: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700422 negativeReply(*interest, 404);
423 return;
424 }
425
426 ProcessInfo& process = m_processes[processId];
427
428 RepoCommandResponse& response = process.response;
429
430 //Check whether it is single data fetching
431 if (!response.hasStartBlockId() &&
432 !response.hasEndBlockId()) {
433 reply(*interest, response);
434 return;
435 }
436
437 //read if noEndtimeout
438 if (!response.hasEndBlockId()) {
439 extendNoEndTime(process);
440 reply(*interest, response);
441 return;
442 }
443 else {
444 reply(*interest, response);
445 }
446}
447
448void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800449WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
450 const std::string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700451{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800452 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700453 negativeReply(*interest, 401);
454}
455
456void
457WriteHandle::deferredDeleteProcess(ProcessId processId)
458{
459 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
460 ndn::bind(&WriteHandle::deleteProcess, this, processId));
461}
462
463void
464WriteHandle::processSingleInsertCommand(const Interest& interest,
465 RepoCommandParameter& parameter)
466{
467 ProcessId processId = generateProcessId();
468
469 ProcessInfo& process = m_processes[processId];
470
471 RepoCommandResponse& response = process.response;
472 response.setStatusCode(100);
473 response.setProcessId(processId);
474 response.setInsertNum(0);
475
476 reply(interest, response);
477
478 response.setStatusCode(300);
479
480 Interest fetchInterest(parameter.getName());
481 if (parameter.hasSelectors()) {
482 fetchInterest.setSelectors(parameter.getSelectors());
483 }
484 getFace().expressInterest(fetchInterest,
485 bind(&WriteHandle::onData, this, _1, _2, processId),
486 bind(&WriteHandle::onTimeout, this, _1, processId));
487}
488
489void
490WriteHandle::processSegmentedInsertCommand(const Interest& interest,
491 RepoCommandParameter& parameter)
492{
493 if (parameter.hasEndBlockId()) {
494 //normal fetch segment
495 if (!parameter.hasStartBlockId()) {
496 parameter.setStartBlockId(0);
497 }
498
499 SegmentNo startBlockId = parameter.getStartBlockId();
500 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700501 if (startBlockId > endBlockId) {
502 negativeReply(interest, 403);
503 return;
504 }
505
506 ProcessId processId = generateProcessId();
507 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700508 RepoCommandResponse& response = process.response;
509 response.setStatusCode(100);
510 response.setProcessId(processId);
511 response.setInsertNum(0);
512 response.setStartBlockId(startBlockId);
513 response.setEndBlockId(endBlockId);
514
515 reply(interest, response);
516
517 //300 means data fetching is in progress
518 response.setStatusCode(300);
519
520 segInit(processId, parameter);
521 }
522 else {
523 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
524 ProcessId processId = generateProcessId();
525 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700526 RepoCommandResponse& response = process.response;
527 response.setStatusCode(100);
528 response.setProcessId(processId);
529 response.setInsertNum(0);
530 response.setStartBlockId(parameter.getStartBlockId());
531 reply(interest, response);
532
533 //300 means data fetching is in progress
534 response.setStatusCode(300);
535
536 segInit(processId, parameter);
537 }
538}
539
540void
541WriteHandle::extendNoEndTime(ProcessInfo& process)
542{
543 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
544 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
545 RepoCommandResponse& response = process.response;
546 if (now > noEndTime) {
547 response.setStatusCode(405);
548 return;
549 }
550 //extends noEndTime
551 process.noEndTime =
552 ndn::time::steady_clock::now() + m_noEndTimeout;
553
554}
555
556void
557WriteHandle::negativeReply(const Interest& interest, int statusCode)
558{
559 RepoCommandResponse response;
560 response.setStatusCode(statusCode);
561 reply(interest, response);
562}
563
564} //namespace repo