blob: 65098bf016641566caea494c733f7a11c9ad16e5 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
Weiqi Shif0330d52014-07-09 10:54:27 -070029WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
30 Scheduler& scheduler,// RepoStorage& storeindex,
31 ValidatorConfig& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070032 : BaseHandle(face, storageHandle, keyChain, scheduler)
33 , m_validator(validator)
34 , m_retryTime(RETRY_TIMEOUT)
35 , m_credit(DEFAULT_CREDIT)
36 , m_noEndTimeout(NOEND_TIMEOUT)
37{
38}
39
40void
41WriteHandle::deleteProcess(ProcessId processId)
42{
43 m_processes.erase(processId);
44}
45
46// Interest.
47void
48WriteHandle::onInterest(const Name& prefix, const Interest& interest)
49{
50 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +080051 bind(&WriteHandle::onValidated, this, _1, prefix),
52 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070053}
54
55// onRegisterFailed.
56void
57WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
58{
Shuo Chenc88c87d2014-06-25 20:21:02 +080059 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070060 throw Error("Insert prefix registration failed");
61}
62
63// onRegisterFailed for insert.
64void
65WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
66{
Shuo Chenc88c87d2014-06-25 20:21:02 +080067 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -070068 throw Error("Insert check prefix registration failed");
69}
70
71void
72WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
73{
74 //m_validResult = 1;
75 RepoCommandParameter parameter;
76 try {
77 extractParameter(*interest, prefix, parameter);
78 }
79 catch (RepoCommandParameter::Error) {
80 negativeReply(*interest, 403);
81 return;
82 }
83
84 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
85 if (parameter.hasSelectors()) {
86 negativeReply(*interest, 402);
87 return;
88 }
89 processSegmentedInsertCommand(*interest, parameter);
90 }
91 else {
92 processSingleInsertCommand(*interest, parameter);
93 }
94
95}
96
97void
Shuo Chen028dcd32014-06-21 16:36:44 +080098WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -070099{
Shuo Chen028dcd32014-06-21 16:36:44 +0800100 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700101 negativeReply(*interest, 401);
102}
103
104void
105WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
106{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800107 m_validator.validate(data,
108 bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
109 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
110}
111
112void
113WriteHandle::onDataValidated(const Interest& interest, const shared_ptr<const Data>& data,
114 ProcessId processId)
115{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700116 if (m_processes.count(processId) == 0) {
117 return;
118 }
119
120 ProcessInfo& process = m_processes[processId];
121 RepoCommandResponse& response = process.response;
122
123 if (response.getInsertNum() == 0) {
Shuo Chenc88c87d2014-06-25 20:21:02 +0800124 getStorageHandle().insertData(*data);
Weiqi Shif0330d52014-07-09 10:54:27 -0700125 // getStorageHandle().insertEntry(*data);
126 // getStoreIndex().insert(*data);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700127 response.setInsertNum(1);
128 }
129
130 deferredDeleteProcess(processId);
131}
132
133void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800134WriteHandle::onDataValidationFailed(const shared_ptr<const Data>& data, const std::string& reason)
135{
136 std::cerr << reason << std::endl;
137}
138
139void
Shuo Chen29c77fe2014-03-18 11:29:41 -0700140WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
141{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800142 m_validator.validate(data,
143 bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
144 bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
145}
146
147void
148WriteHandle::onSegmentDataValidated(const Interest& interest,
149 const shared_ptr<const Data>& data,
150 ProcessId processId)
151{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700152 if (m_processes.count(processId) == 0) {
153 return;
154 }
155 RepoCommandResponse& response = m_processes[processId].response;
156
157 //refresh endBlockId
Shuo Chenc88c87d2014-06-25 20:21:02 +0800158 Name::Component finalBlockId = data->getFinalBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700159
160 if (!finalBlockId.empty()) {
161 SegmentNo final = finalBlockId.toSegment();
162 if (response.hasEndBlockId()) {
163 if (final < response.getEndBlockId()) {
164 response.setEndBlockId(final);
165 }
166 }
167 else {
168 response.setEndBlockId(final);
169 }
170 }
171
172 //insert data
Shuo Chenc88c87d2014-06-25 20:21:02 +0800173 if (getStorageHandle().insertData(*data)) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700174 response.setInsertNum(response.getInsertNum() + 1);
175 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700176
177 onSegmentDataControl(processId, interest);
178}
179
180void
181WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
182{
Shuo Chen028dcd32014-06-21 16:36:44 +0800183 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700184 m_processes.erase(processId);
185}
186
187void
188WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
189{
Shuo Chen028dcd32014-06-21 16:36:44 +0800190 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700191
192 onSegmentTimeoutControl(processId, interest);
193}
194
195void
196WriteHandle::listen(const Name& prefix)
197{
198 Name insertPrefix;
199 insertPrefix.append(prefix).append("insert");
Weiqi Shif0330d52014-07-09 10:54:27 -0700200 ndn::InterestFilter filter_insert(insertPrefix);
201 getFace().setInterestFilter(filter_insert,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700202 bind(&WriteHandle::onInterest, this, _1, _2),
203 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
204 Name insertCheckPrefix;
205 insertCheckPrefix.append(prefix).append("insert check");
Weiqi Shif0330d52014-07-09 10:54:27 -0700206 ndn::InterestFilter filter_insertCheck(insertCheckPrefix);
207 getFace().setInterestFilter(filter_insertCheck,
Shuo Chen29c77fe2014-03-18 11:29:41 -0700208 bind(&WriteHandle::onCheckInterest, this, _1, _2),
209 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
210}
211
212void
213WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
214{
215 ProcessInfo& process = m_processes[processId];
216 process.credit = 0;
217
218 map<SegmentNo, int>& processRetry = process.retryCounts;
219
220 Name name = parameter.getName();
221 SegmentNo startBlockId = parameter.getStartBlockId();
222
223 uint64_t initialCredit = m_credit;
224
225 if (parameter.hasEndBlockId()) {
226 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800227 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700228 }
229 else {
230 // set noEndTimeout timer
231 process.noEndTime = ndn::time::steady_clock::now() +
232 m_noEndTimeout;
233 }
234 process.credit = initialCredit;
235 SegmentNo segment = startBlockId;
236 for (; segment < startBlockId + initialCredit; ++segment) {
237 Name fetchName = name;
238 fetchName.appendSegment(segment);
239 Interest interest(fetchName);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700240 getFace().expressInterest(interest,
241 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
242 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
243 process.credit--;
244 processRetry[segment] = 0;
245 }
246
247 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
248
Shuo Chen29c77fe2014-03-18 11:29:41 -0700249 process.nextSegment = segment;
250 nextSegmentQueue.push(segment);
251}
252
253void
254WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
255{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700256 if (m_processes.count(processId) == 0) {
257 return;
258 }
259 ProcessInfo& process = m_processes[processId];
260 RepoCommandResponse& response = process.response;
261 int& processCredit = process.credit;
262 //onSegmentDataControl is called when a data returns.
263 //When data returns, processCredit++
264 processCredit++;
265 SegmentNo& nextSegment = process.nextSegment;
266 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
267 map<SegmentNo, int>& retryCounts = process.retryCounts;
268
269 //read whether notime timeout
270 if (!response.hasEndBlockId()) {
271
272 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
273 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
274
275 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800276 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700277 //m_processes.erase(processId);
278 //StatusCode should be refreshed as 405
279 response.setStatusCode(405);
280 //schedule a delete event
281 deferredDeleteProcess(processId);
282 return;
283 }
284 }
285
286 //read whether this process has total ends, if ends, remove control info from the maps
287 if (response.hasEndBlockId()) {
288 uint64_t nSegments =
289 response.getEndBlockId() - response.getStartBlockId() + 1;
290 if (response.getInsertNum() >= nSegments) {
291 //m_processes.erase(processId);
292 //All the data has been inserted, StatusCode is refreshed as 200
293 response.setStatusCode(200);
294 deferredDeleteProcess(processId);
295 return;
296 }
297 }
298
299 //check whether there is any credit
300 if (processCredit == 0)
301 return;
302
303
304 //check whether sent queue empty
305 if (nextSegmentQueue.empty()) {
306 //do not do anything
307 return;
308 }
309
310 //pop the queue
311 SegmentNo sendingSegment = nextSegmentQueue.front();
312 nextSegmentQueue.pop();
313
314 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800315 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700316 //do not do anything
317 return;
318 }
319
320 //read whether this is retransmitted data;
321 SegmentNo fetchedSegment =
322 interest.getName().get(interest.getName().size() - 1).toSegment();
323
324 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
325
326 //find this fetched data, remove it from this map
327 //rit->second.erase(oit);
328 retryCounts.erase(fetchedSegment);
329 //express the interest of the top of the queue
330 Name fetchName(interest.getName().getPrefix(-1));
331 fetchName.appendSegment(sendingSegment);
332 Interest fetchInterest(fetchName);
333 getFace().expressInterest(fetchInterest,
334 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
335 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
336 //When an interest is expressed, processCredit--
337 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700338 if (retryCounts.count(sendingSegment) == 0) {
339 //not found
340 retryCounts[sendingSegment] = 0;
341 }
342 else {
343 //found
344 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
345 }
346 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800347 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700348 nextSegment++;
349 nextSegmentQueue.push(nextSegment);
350 }
351}
352
353void
354WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
355{
356 if (m_processes.count(processId) == 0) {
357 return;
358 }
359 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700360 // RepoCommandResponse& response = process.response;
361 // SegmentNo& nextSegment = process.nextSegment;
362 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700363 map<SegmentNo, int>& retryCounts = process.retryCounts;
364
365 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
366
Shuo Chen028dcd32014-06-21 16:36:44 +0800367 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700368
369 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
370
371 //read the retry time. If retry out of time, fail the process. if not, plus
372 int& retryTime = retryCounts[timeoutSegment];
373 if (retryTime >= m_retryTime) {
374 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800375 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700376 m_processes.erase(processId);
377 return;
378 }
379 else {
380 //Reput it in the queue, retryTime++
381 retryTime++;
382 Interest retryInterest(interest.getName());
383 getFace().expressInterest(retryInterest,
384 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
385 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
386 }
387
388}
389
390void
391WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
392{
393 m_validator.validate(interest,
Shuo Chenc88c87d2014-06-25 20:21:02 +0800394 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
395 bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -0700396
397}
398
399void
400WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
401{
402 RepoCommandParameter parameter;
403 try {
404 extractParameter(*interest, prefix, parameter);
405 }
406 catch (RepoCommandParameter::Error) {
407 negativeReply(*interest, 403);
408 return;
409 }
410
411 if (!parameter.hasProcessId()) {
412 negativeReply(*interest, 403);
413 return;
414 }
415 //check whether this process exists
416 ProcessId processId = parameter.getProcessId();
417 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800418 std::cerr << "no such processId: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700419 negativeReply(*interest, 404);
420 return;
421 }
422
423 ProcessInfo& process = m_processes[processId];
424
425 RepoCommandResponse& response = process.response;
426
427 //Check whether it is single data fetching
428 if (!response.hasStartBlockId() &&
429 !response.hasEndBlockId()) {
430 reply(*interest, response);
431 return;
432 }
433
434 //read if noEndtimeout
435 if (!response.hasEndBlockId()) {
436 extendNoEndTime(process);
437 reply(*interest, response);
438 return;
439 }
440 else {
441 reply(*interest, response);
442 }
443}
444
445void
Shuo Chenc88c87d2014-06-25 20:21:02 +0800446WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest,
447 const std::string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700448{
Shuo Chenc88c87d2014-06-25 20:21:02 +0800449 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700450 negativeReply(*interest, 401);
451}
452
453void
454WriteHandle::deferredDeleteProcess(ProcessId processId)
455{
456 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
457 ndn::bind(&WriteHandle::deleteProcess, this, processId));
458}
459
460void
461WriteHandle::processSingleInsertCommand(const Interest& interest,
462 RepoCommandParameter& parameter)
463{
464 ProcessId processId = generateProcessId();
465
466 ProcessInfo& process = m_processes[processId];
467
468 RepoCommandResponse& response = process.response;
469 response.setStatusCode(100);
470 response.setProcessId(processId);
471 response.setInsertNum(0);
472
473 reply(interest, response);
474
475 response.setStatusCode(300);
476
477 Interest fetchInterest(parameter.getName());
478 if (parameter.hasSelectors()) {
479 fetchInterest.setSelectors(parameter.getSelectors());
480 }
481 getFace().expressInterest(fetchInterest,
482 bind(&WriteHandle::onData, this, _1, _2, processId),
483 bind(&WriteHandle::onTimeout, this, _1, processId));
484}
485
486void
487WriteHandle::processSegmentedInsertCommand(const Interest& interest,
488 RepoCommandParameter& parameter)
489{
490 if (parameter.hasEndBlockId()) {
491 //normal fetch segment
492 if (!parameter.hasStartBlockId()) {
493 parameter.setStartBlockId(0);
494 }
495
496 SegmentNo startBlockId = parameter.getStartBlockId();
497 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700498 if (startBlockId > endBlockId) {
499 negativeReply(interest, 403);
500 return;
501 }
502
503 ProcessId processId = generateProcessId();
504 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700505 RepoCommandResponse& response = process.response;
506 response.setStatusCode(100);
507 response.setProcessId(processId);
508 response.setInsertNum(0);
509 response.setStartBlockId(startBlockId);
510 response.setEndBlockId(endBlockId);
511
512 reply(interest, response);
513
514 //300 means data fetching is in progress
515 response.setStatusCode(300);
516
517 segInit(processId, parameter);
518 }
519 else {
520 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
521 ProcessId processId = generateProcessId();
522 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700523 RepoCommandResponse& response = process.response;
524 response.setStatusCode(100);
525 response.setProcessId(processId);
526 response.setInsertNum(0);
527 response.setStartBlockId(parameter.getStartBlockId());
528 reply(interest, response);
529
530 //300 means data fetching is in progress
531 response.setStatusCode(300);
532
533 segInit(processId, parameter);
534 }
535}
536
537void
538WriteHandle::extendNoEndTime(ProcessInfo& process)
539{
540 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
541 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
542 RepoCommandResponse& response = process.response;
543 if (now > noEndTime) {
544 response.setStatusCode(405);
545 return;
546 }
547 //extends noEndTime
548 process.noEndTime =
549 ndn::time::steady_clock::now() + m_noEndTimeout;
550
551}
552
553void
554WriteHandle::negativeReply(const Interest& interest, int statusCode)
555{
556 RepoCommandResponse response;
557 response.setStatusCode(statusCode);
558 reply(interest, response);
559}
560
561} //namespace repo