blob: 77c674b0bf3919d141513e828e9012953c055cc9 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
29WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
30 Scheduler& scheduler, CommandInterestValidator& validator)
31 : BaseHandle(face, storageHandle, keyChain, scheduler)
32 , m_validator(validator)
33 , m_retryTime(RETRY_TIMEOUT)
34 , m_credit(DEFAULT_CREDIT)
35 , m_noEndTimeout(NOEND_TIMEOUT)
36{
37}
38
39void
40WriteHandle::deleteProcess(ProcessId processId)
41{
42 m_processes.erase(processId);
43}
44
45// Interest.
46void
47WriteHandle::onInterest(const Name& prefix, const Interest& interest)
48{
49 m_validator.validate(interest,
50 bind(&WriteHandle::onValidated, this, _1, prefix),
51 bind(&WriteHandle::onValidationFailed, this, _1));
52}
53
Wentao Shang91fb4f22014-05-20 10:55:22 -070054void
55WriteHandle::onRegisterSuccess(const Name& prefix)
56{
57 std::cerr << "Successfully registered prefix " << prefix << std::endl;
58}
59
Shuo Chen29c77fe2014-03-18 11:29:41 -070060// onRegisterFailed.
61void
62WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
63{
64 throw Error("Insert prefix registration failed");
65}
66
67// onRegisterFailed for insert.
68void
69WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
70{
71 throw Error("Insert check prefix registration failed");
72}
73
74void
75WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 //m_validResult = 1;
78 RepoCommandParameter parameter;
79 try {
80 extractParameter(*interest, prefix, parameter);
81 }
82 catch (RepoCommandParameter::Error) {
83 negativeReply(*interest, 403);
84 return;
85 }
86
87 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
88 if (parameter.hasSelectors()) {
89 negativeReply(*interest, 402);
90 return;
91 }
92 processSegmentedInsertCommand(*interest, parameter);
93 }
94 else {
95 processSingleInsertCommand(*interest, parameter);
96 }
97
98}
99
100void
101WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
102{
103 std::cout << "invalidated" << std::endl;
104 negativeReply(*interest, 401);
105}
106
107void
108WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
109{
110 //std::cout << "onData" << std::endl;
111 //std::cout << "I: " << interest.toUri() << std::endl;
112 //std::cout << "D: " << data.getName().toUri() << std::endl;
113 if (m_processes.count(processId) == 0) {
114 return;
115 }
116
117 ProcessInfo& process = m_processes[processId];
118 RepoCommandResponse& response = process.response;
119
120 if (response.getInsertNum() == 0) {
121 getStorageHandle().insertData(data);
122 response.setInsertNum(1);
123 }
124
125 deferredDeleteProcess(processId);
126}
127
128void
129WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
130{
131 //std::cout << "I: " << interest.toUri() << std::endl;
132 //std::cout << "D: " << data.getName().toUri() << std::endl;
133 //retrieve the process from the responsemap
134
135 if (m_processes.count(processId) == 0) {
136 return;
137 }
138 RepoCommandResponse& response = m_processes[processId].response;
139
140 //refresh endBlockId
141 Name::Component finalBlockId = data.getFinalBlockId();
142
143 if (!finalBlockId.empty()) {
144 SegmentNo final = finalBlockId.toSegment();
145 if (response.hasEndBlockId()) {
146 if (final < response.getEndBlockId()) {
147 response.setEndBlockId(final);
148 }
149 }
150 else {
151 response.setEndBlockId(final);
152 }
153 }
154
155 //insert data
156 //std::cout << "start to insert" << std::endl;
157 if (getStorageHandle().insertData(data)) {
158 response.setInsertNum(response.getInsertNum() + 1);
159 }
160 //std::cout << "end of insert" << std::endl;
161
162 //it->second = response;
163
164 onSegmentDataControl(processId, interest);
165}
166
167void
168WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
169{
170 std::cout << "Timeout" << std::endl;
171 m_processes.erase(processId);
172}
173
174void
175WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
176{
177 std::cout << "SegTimeout" << std::endl;
178
179 onSegmentTimeoutControl(processId, interest);
180}
181
182void
183WriteHandle::listen(const Name& prefix)
184{
185 Name insertPrefix;
186 insertPrefix.append(prefix).append("insert");
187 getFace().setInterestFilter(insertPrefix,
188 bind(&WriteHandle::onInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700189 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700190 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
191 Name insertCheckPrefix;
192 insertCheckPrefix.append(prefix).append("insert check");
193 getFace().setInterestFilter(insertCheckPrefix,
194 bind(&WriteHandle::onCheckInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700195 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700196 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
197}
198
199void
200WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
201{
202 ProcessInfo& process = m_processes[processId];
203 process.credit = 0;
204
205 map<SegmentNo, int>& processRetry = process.retryCounts;
206
207 Name name = parameter.getName();
208 SegmentNo startBlockId = parameter.getStartBlockId();
209
210 uint64_t initialCredit = m_credit;
211
212 if (parameter.hasEndBlockId()) {
213 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800214 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700215 }
216 else {
217 // set noEndTimeout timer
218 process.noEndTime = ndn::time::steady_clock::now() +
219 m_noEndTimeout;
220 }
221 process.credit = initialCredit;
222 SegmentNo segment = startBlockId;
223 for (; segment < startBlockId + initialCredit; ++segment) {
224 Name fetchName = name;
225 fetchName.appendSegment(segment);
226 Interest interest(fetchName);
227 //std::cout << "seg:" << j<<std::endl;
228 getFace().expressInterest(interest,
229 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
230 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
231 process.credit--;
232 processRetry[segment] = 0;
233 }
234
235 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
236
Shuo Chen29c77fe2014-03-18 11:29:41 -0700237 process.nextSegment = segment;
238 nextSegmentQueue.push(segment);
239}
240
241void
242WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
243{
244 //std::cout << "onSegmentDataControl: " << processId << std::endl;
245
246 if (m_processes.count(processId) == 0) {
247 return;
248 }
249 ProcessInfo& process = m_processes[processId];
250 RepoCommandResponse& response = process.response;
251 int& processCredit = process.credit;
252 //onSegmentDataControl is called when a data returns.
253 //When data returns, processCredit++
254 processCredit++;
255 SegmentNo& nextSegment = process.nextSegment;
256 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
257 map<SegmentNo, int>& retryCounts = process.retryCounts;
258
259 //read whether notime timeout
260 if (!response.hasEndBlockId()) {
261
262 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
263 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
264
265 if (now > noEndTime) {
266 std::cout << "noEndtimeout: " << processId << std::endl;
267 //m_processes.erase(processId);
268 //StatusCode should be refreshed as 405
269 response.setStatusCode(405);
270 //schedule a delete event
271 deferredDeleteProcess(processId);
272 return;
273 }
274 }
275
276 //read whether this process has total ends, if ends, remove control info from the maps
277 if (response.hasEndBlockId()) {
278 uint64_t nSegments =
279 response.getEndBlockId() - response.getStartBlockId() + 1;
280 if (response.getInsertNum() >= nSegments) {
281 //m_processes.erase(processId);
282 //All the data has been inserted, StatusCode is refreshed as 200
283 response.setStatusCode(200);
284 deferredDeleteProcess(processId);
285 return;
286 }
287 }
288
289 //check whether there is any credit
290 if (processCredit == 0)
291 return;
292
293
294 //check whether sent queue empty
295 if (nextSegmentQueue.empty()) {
296 //do not do anything
297 return;
298 }
299
300 //pop the queue
301 SegmentNo sendingSegment = nextSegmentQueue.front();
302 nextSegmentQueue.pop();
303
304 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800305 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700306 //do not do anything
307 return;
308 }
309
310 //read whether this is retransmitted data;
311 SegmentNo fetchedSegment =
312 interest.getName().get(interest.getName().size() - 1).toSegment();
313
314 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
315
316 //find this fetched data, remove it from this map
317 //rit->second.erase(oit);
318 retryCounts.erase(fetchedSegment);
319 //express the interest of the top of the queue
320 Name fetchName(interest.getName().getPrefix(-1));
321 fetchName.appendSegment(sendingSegment);
322 Interest fetchInterest(fetchName);
323 getFace().expressInterest(fetchInterest,
324 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
325 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
326 //When an interest is expressed, processCredit--
327 processCredit--;
328 //std::cout << "sent seg: " << sendingSegment << std::endl;
329 if (retryCounts.count(sendingSegment) == 0) {
330 //not found
331 retryCounts[sendingSegment] = 0;
332 }
333 else {
334 //found
335 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
336 }
337 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800338 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700339 nextSegment++;
340 nextSegmentQueue.push(nextSegment);
341 }
342}
343
344void
345WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
346{
347 if (m_processes.count(processId) == 0) {
348 return;
349 }
350 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700351 // RepoCommandResponse& response = process.response;
352 // SegmentNo& nextSegment = process.nextSegment;
353 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700354 map<SegmentNo, int>& retryCounts = process.retryCounts;
355
356 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
357
358 std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
359
360 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
361
362 //read the retry time. If retry out of time, fail the process. if not, plus
363 int& retryTime = retryCounts[timeoutSegment];
364 if (retryTime >= m_retryTime) {
365 //fail this process
366 std::cout << "Retry timeout: " << processId << std::endl;
367 m_processes.erase(processId);
368 return;
369 }
370 else {
371 //Reput it in the queue, retryTime++
372 retryTime++;
373 Interest retryInterest(interest.getName());
374 getFace().expressInterest(retryInterest,
375 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
376 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
377 }
378
379}
380
381void
382WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
383{
384 m_validator.validate(interest,
385 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
386 bind(&WriteHandle::onCheckValidationFailed, this, _1));
387
388}
389
390void
391WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
392{
393 RepoCommandParameter parameter;
394 try {
395 extractParameter(*interest, prefix, parameter);
396 }
397 catch (RepoCommandParameter::Error) {
398 negativeReply(*interest, 403);
399 return;
400 }
401
402 if (!parameter.hasProcessId()) {
403 negativeReply(*interest, 403);
404 return;
405 }
406 //check whether this process exists
407 ProcessId processId = parameter.getProcessId();
408 if (m_processes.count(processId) == 0) {
409 std::cout << "no such processId: " << processId << std::endl;
410 negativeReply(*interest, 404);
411 return;
412 }
413
414 ProcessInfo& process = m_processes[processId];
415
416 RepoCommandResponse& response = process.response;
417
418 //Check whether it is single data fetching
419 if (!response.hasStartBlockId() &&
420 !response.hasEndBlockId()) {
421 reply(*interest, response);
422 return;
423 }
424
425 //read if noEndtimeout
426 if (!response.hasEndBlockId()) {
427 extendNoEndTime(process);
428 reply(*interest, response);
429 return;
430 }
431 else {
432 reply(*interest, response);
433 }
434}
435
436void
437WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
438{
439 negativeReply(*interest, 401);
440}
441
442void
443WriteHandle::deferredDeleteProcess(ProcessId processId)
444{
445 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
446 ndn::bind(&WriteHandle::deleteProcess, this, processId));
447}
448
449void
450WriteHandle::processSingleInsertCommand(const Interest& interest,
451 RepoCommandParameter& parameter)
452{
453 ProcessId processId = generateProcessId();
454
455 ProcessInfo& process = m_processes[processId];
456
457 RepoCommandResponse& response = process.response;
458 response.setStatusCode(100);
459 response.setProcessId(processId);
460 response.setInsertNum(0);
461
462 reply(interest, response);
463
464 response.setStatusCode(300);
465
466 Interest fetchInterest(parameter.getName());
467 if (parameter.hasSelectors()) {
468 fetchInterest.setSelectors(parameter.getSelectors());
469 }
470 getFace().expressInterest(fetchInterest,
471 bind(&WriteHandle::onData, this, _1, _2, processId),
472 bind(&WriteHandle::onTimeout, this, _1, processId));
473}
474
475void
476WriteHandle::processSegmentedInsertCommand(const Interest& interest,
477 RepoCommandParameter& parameter)
478{
479 if (parameter.hasEndBlockId()) {
480 //normal fetch segment
481 if (!parameter.hasStartBlockId()) {
482 parameter.setStartBlockId(0);
483 }
484
485 SegmentNo startBlockId = parameter.getStartBlockId();
486 SegmentNo endBlockId = parameter.getEndBlockId();
487 //std::cout << "startBlockId: " << startBlockId << std::endl;
488 //std::cout << "endBlockId: " << endBlockId << std::endl;
489 if (startBlockId > endBlockId) {
490 negativeReply(interest, 403);
491 return;
492 }
493
494 ProcessId processId = generateProcessId();
495 ProcessInfo& process = m_processes[processId];
496 //std::cout << "processId: " << processId << std::endl;
497 RepoCommandResponse& response = process.response;
498 response.setStatusCode(100);
499 response.setProcessId(processId);
500 response.setInsertNum(0);
501 response.setStartBlockId(startBlockId);
502 response.setEndBlockId(endBlockId);
503
504 reply(interest, response);
505
506 //300 means data fetching is in progress
507 response.setStatusCode(300);
508
509 segInit(processId, parameter);
510 }
511 else {
512 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
513 ProcessId processId = generateProcessId();
514 ProcessInfo& process = m_processes[processId];
515 //std::cout << "processId: " << processId << std::endl;
516 RepoCommandResponse& response = process.response;
517 response.setStatusCode(100);
518 response.setProcessId(processId);
519 response.setInsertNum(0);
520 response.setStartBlockId(parameter.getStartBlockId());
521 reply(interest, response);
522
523 //300 means data fetching is in progress
524 response.setStatusCode(300);
525
526 segInit(processId, parameter);
527 }
528}
529
530void
531WriteHandle::extendNoEndTime(ProcessInfo& process)
532{
533 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
534 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
535 RepoCommandResponse& response = process.response;
536 if (now > noEndTime) {
537 response.setStatusCode(405);
538 return;
539 }
540 //extends noEndTime
541 process.noEndTime =
542 ndn::time::steady_clock::now() + m_noEndTimeout;
543
544}
545
546void
547WriteHandle::negativeReply(const Interest& interest, int statusCode)
548{
549 RepoCommandResponse response;
550 response.setStatusCode(statusCode);
551 reply(interest, response);
552}
553
554} //namespace repo