blob: 91557f32e90cc659ce5a3c3c783888471f66a850 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
29WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
Shuo Chen028dcd32014-06-21 16:36:44 +080030 Scheduler& scheduler, ValidatorConfig& validator)
Shuo Chen29c77fe2014-03-18 11:29:41 -070031 : BaseHandle(face, storageHandle, keyChain, scheduler)
32 , m_validator(validator)
33 , m_retryTime(RETRY_TIMEOUT)
34 , m_credit(DEFAULT_CREDIT)
35 , m_noEndTimeout(NOEND_TIMEOUT)
36{
37}
38
39void
40WriteHandle::deleteProcess(ProcessId processId)
41{
42 m_processes.erase(processId);
43}
44
45// Interest.
46void
47WriteHandle::onInterest(const Name& prefix, const Interest& interest)
48{
49 m_validator.validate(interest,
50 bind(&WriteHandle::onValidated, this, _1, prefix),
Shuo Chen028dcd32014-06-21 16:36:44 +080051 bind(&WriteHandle::onValidationFailed, this, _1, _2));
Shuo Chen29c77fe2014-03-18 11:29:41 -070052}
53
Wentao Shang91fb4f22014-05-20 10:55:22 -070054void
55WriteHandle::onRegisterSuccess(const Name& prefix)
56{
57 std::cerr << "Successfully registered prefix " << prefix << std::endl;
58}
59
Shuo Chen29c77fe2014-03-18 11:29:41 -070060// onRegisterFailed.
61void
62WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
63{
64 throw Error("Insert prefix registration failed");
65}
66
67// onRegisterFailed for insert.
68void
69WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
70{
71 throw Error("Insert check prefix registration failed");
72}
73
74void
75WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
76{
77 //m_validResult = 1;
78 RepoCommandParameter parameter;
79 try {
80 extractParameter(*interest, prefix, parameter);
81 }
82 catch (RepoCommandParameter::Error) {
83 negativeReply(*interest, 403);
84 return;
85 }
86
87 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
88 if (parameter.hasSelectors()) {
89 negativeReply(*interest, 402);
90 return;
91 }
92 processSegmentedInsertCommand(*interest, parameter);
93 }
94 else {
95 processSingleInsertCommand(*interest, parameter);
96 }
97
98}
99
100void
Shuo Chen028dcd32014-06-21 16:36:44 +0800101WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const string& reason)
Shuo Chen29c77fe2014-03-18 11:29:41 -0700102{
Shuo Chen028dcd32014-06-21 16:36:44 +0800103 std::cerr << reason << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700104 negativeReply(*interest, 401);
105}
106
107void
108WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
109{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700110 if (m_processes.count(processId) == 0) {
111 return;
112 }
113
114 ProcessInfo& process = m_processes[processId];
115 RepoCommandResponse& response = process.response;
116
117 if (response.getInsertNum() == 0) {
118 getStorageHandle().insertData(data);
119 response.setInsertNum(1);
120 }
121
122 deferredDeleteProcess(processId);
123}
124
125void
126WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
127{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700128 if (m_processes.count(processId) == 0) {
129 return;
130 }
131 RepoCommandResponse& response = m_processes[processId].response;
132
133 //refresh endBlockId
134 Name::Component finalBlockId = data.getFinalBlockId();
135
136 if (!finalBlockId.empty()) {
137 SegmentNo final = finalBlockId.toSegment();
138 if (response.hasEndBlockId()) {
139 if (final < response.getEndBlockId()) {
140 response.setEndBlockId(final);
141 }
142 }
143 else {
144 response.setEndBlockId(final);
145 }
146 }
147
148 //insert data
Shuo Chen29c77fe2014-03-18 11:29:41 -0700149 if (getStorageHandle().insertData(data)) {
150 response.setInsertNum(response.getInsertNum() + 1);
151 }
Shuo Chen29c77fe2014-03-18 11:29:41 -0700152
153 onSegmentDataControl(processId, interest);
154}
155
156void
157WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
158{
Shuo Chen028dcd32014-06-21 16:36:44 +0800159 std::cerr << "Timeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700160 m_processes.erase(processId);
161}
162
163void
164WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
165{
Shuo Chen028dcd32014-06-21 16:36:44 +0800166 std::cerr << "SegTimeout" << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700167
168 onSegmentTimeoutControl(processId, interest);
169}
170
171void
172WriteHandle::listen(const Name& prefix)
173{
174 Name insertPrefix;
175 insertPrefix.append(prefix).append("insert");
176 getFace().setInterestFilter(insertPrefix,
177 bind(&WriteHandle::onInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700178 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700179 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
180 Name insertCheckPrefix;
181 insertCheckPrefix.append(prefix).append("insert check");
182 getFace().setInterestFilter(insertCheckPrefix,
183 bind(&WriteHandle::onCheckInterest, this, _1, _2),
Wentao Shang91fb4f22014-05-20 10:55:22 -0700184 bind(&WriteHandle::onRegisterSuccess, this, _1),
Shuo Chen29c77fe2014-03-18 11:29:41 -0700185 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
186}
187
188void
189WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
190{
191 ProcessInfo& process = m_processes[processId];
192 process.credit = 0;
193
194 map<SegmentNo, int>& processRetry = process.retryCounts;
195
196 Name name = parameter.getName();
197 SegmentNo startBlockId = parameter.getStartBlockId();
198
199 uint64_t initialCredit = m_credit;
200
201 if (parameter.hasEndBlockId()) {
202 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800203 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700204 }
205 else {
206 // set noEndTimeout timer
207 process.noEndTime = ndn::time::steady_clock::now() +
208 m_noEndTimeout;
209 }
210 process.credit = initialCredit;
211 SegmentNo segment = startBlockId;
212 for (; segment < startBlockId + initialCredit; ++segment) {
213 Name fetchName = name;
214 fetchName.appendSegment(segment);
215 Interest interest(fetchName);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700216 getFace().expressInterest(interest,
217 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
218 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
219 process.credit--;
220 processRetry[segment] = 0;
221 }
222
223 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
224
Shuo Chen29c77fe2014-03-18 11:29:41 -0700225 process.nextSegment = segment;
226 nextSegmentQueue.push(segment);
227}
228
229void
230WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
231{
Shuo Chen29c77fe2014-03-18 11:29:41 -0700232 if (m_processes.count(processId) == 0) {
233 return;
234 }
235 ProcessInfo& process = m_processes[processId];
236 RepoCommandResponse& response = process.response;
237 int& processCredit = process.credit;
238 //onSegmentDataControl is called when a data returns.
239 //When data returns, processCredit++
240 processCredit++;
241 SegmentNo& nextSegment = process.nextSegment;
242 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
243 map<SegmentNo, int>& retryCounts = process.retryCounts;
244
245 //read whether notime timeout
246 if (!response.hasEndBlockId()) {
247
248 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
249 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
250
251 if (now > noEndTime) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800252 std::cerr << "noEndtimeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700253 //m_processes.erase(processId);
254 //StatusCode should be refreshed as 405
255 response.setStatusCode(405);
256 //schedule a delete event
257 deferredDeleteProcess(processId);
258 return;
259 }
260 }
261
262 //read whether this process has total ends, if ends, remove control info from the maps
263 if (response.hasEndBlockId()) {
264 uint64_t nSegments =
265 response.getEndBlockId() - response.getStartBlockId() + 1;
266 if (response.getInsertNum() >= nSegments) {
267 //m_processes.erase(processId);
268 //All the data has been inserted, StatusCode is refreshed as 200
269 response.setStatusCode(200);
270 deferredDeleteProcess(processId);
271 return;
272 }
273 }
274
275 //check whether there is any credit
276 if (processCredit == 0)
277 return;
278
279
280 //check whether sent queue empty
281 if (nextSegmentQueue.empty()) {
282 //do not do anything
283 return;
284 }
285
286 //pop the queue
287 SegmentNo sendingSegment = nextSegmentQueue.front();
288 nextSegmentQueue.pop();
289
290 //check whether sendingSegment exceeds
Shuo Chenccfbe242014-04-29 23:57:51 +0800291 if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700292 //do not do anything
293 return;
294 }
295
296 //read whether this is retransmitted data;
297 SegmentNo fetchedSegment =
298 interest.getName().get(interest.getName().size() - 1).toSegment();
299
300 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
301
302 //find this fetched data, remove it from this map
303 //rit->second.erase(oit);
304 retryCounts.erase(fetchedSegment);
305 //express the interest of the top of the queue
306 Name fetchName(interest.getName().getPrefix(-1));
307 fetchName.appendSegment(sendingSegment);
308 Interest fetchInterest(fetchName);
309 getFace().expressInterest(fetchInterest,
310 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
311 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
312 //When an interest is expressed, processCredit--
313 processCredit--;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700314 if (retryCounts.count(sendingSegment) == 0) {
315 //not found
316 retryCounts[sendingSegment] = 0;
317 }
318 else {
319 //found
320 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
321 }
322 //increase the next seg and put it into the queue
Shuo Chenccfbe242014-04-29 23:57:51 +0800323 if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
Shuo Chen29c77fe2014-03-18 11:29:41 -0700324 nextSegment++;
325 nextSegmentQueue.push(nextSegment);
326 }
327}
328
329void
330WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
331{
332 if (m_processes.count(processId) == 0) {
333 return;
334 }
335 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700336 // RepoCommandResponse& response = process.response;
337 // SegmentNo& nextSegment = process.nextSegment;
338 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700339 map<SegmentNo, int>& retryCounts = process.retryCounts;
340
341 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
342
Shuo Chen028dcd32014-06-21 16:36:44 +0800343 std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700344
345 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
346
347 //read the retry time. If retry out of time, fail the process. if not, plus
348 int& retryTime = retryCounts[timeoutSegment];
349 if (retryTime >= m_retryTime) {
350 //fail this process
Shuo Chen028dcd32014-06-21 16:36:44 +0800351 std::cerr << "Retry timeout: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700352 m_processes.erase(processId);
353 return;
354 }
355 else {
356 //Reput it in the queue, retryTime++
357 retryTime++;
358 Interest retryInterest(interest.getName());
359 getFace().expressInterest(retryInterest,
360 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
361 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
362 }
363
364}
365
366void
367WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
368{
369 m_validator.validate(interest,
370 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
371 bind(&WriteHandle::onCheckValidationFailed, this, _1));
372
373}
374
375void
376WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
377{
378 RepoCommandParameter parameter;
379 try {
380 extractParameter(*interest, prefix, parameter);
381 }
382 catch (RepoCommandParameter::Error) {
383 negativeReply(*interest, 403);
384 return;
385 }
386
387 if (!parameter.hasProcessId()) {
388 negativeReply(*interest, 403);
389 return;
390 }
391 //check whether this process exists
392 ProcessId processId = parameter.getProcessId();
393 if (m_processes.count(processId) == 0) {
Shuo Chen028dcd32014-06-21 16:36:44 +0800394 std::cerr << "no such processId: " << processId << std::endl;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700395 negativeReply(*interest, 404);
396 return;
397 }
398
399 ProcessInfo& process = m_processes[processId];
400
401 RepoCommandResponse& response = process.response;
402
403 //Check whether it is single data fetching
404 if (!response.hasStartBlockId() &&
405 !response.hasEndBlockId()) {
406 reply(*interest, response);
407 return;
408 }
409
410 //read if noEndtimeout
411 if (!response.hasEndBlockId()) {
412 extendNoEndTime(process);
413 reply(*interest, response);
414 return;
415 }
416 else {
417 reply(*interest, response);
418 }
419}
420
421void
422WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
423{
424 negativeReply(*interest, 401);
425}
426
427void
428WriteHandle::deferredDeleteProcess(ProcessId processId)
429{
430 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
431 ndn::bind(&WriteHandle::deleteProcess, this, processId));
432}
433
434void
435WriteHandle::processSingleInsertCommand(const Interest& interest,
436 RepoCommandParameter& parameter)
437{
438 ProcessId processId = generateProcessId();
439
440 ProcessInfo& process = m_processes[processId];
441
442 RepoCommandResponse& response = process.response;
443 response.setStatusCode(100);
444 response.setProcessId(processId);
445 response.setInsertNum(0);
446
447 reply(interest, response);
448
449 response.setStatusCode(300);
450
451 Interest fetchInterest(parameter.getName());
452 if (parameter.hasSelectors()) {
453 fetchInterest.setSelectors(parameter.getSelectors());
454 }
455 getFace().expressInterest(fetchInterest,
456 bind(&WriteHandle::onData, this, _1, _2, processId),
457 bind(&WriteHandle::onTimeout, this, _1, processId));
458}
459
460void
461WriteHandle::processSegmentedInsertCommand(const Interest& interest,
462 RepoCommandParameter& parameter)
463{
464 if (parameter.hasEndBlockId()) {
465 //normal fetch segment
466 if (!parameter.hasStartBlockId()) {
467 parameter.setStartBlockId(0);
468 }
469
470 SegmentNo startBlockId = parameter.getStartBlockId();
471 SegmentNo endBlockId = parameter.getEndBlockId();
Shuo Chen29c77fe2014-03-18 11:29:41 -0700472 if (startBlockId > endBlockId) {
473 negativeReply(interest, 403);
474 return;
475 }
476
477 ProcessId processId = generateProcessId();
478 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700479 RepoCommandResponse& response = process.response;
480 response.setStatusCode(100);
481 response.setProcessId(processId);
482 response.setInsertNum(0);
483 response.setStartBlockId(startBlockId);
484 response.setEndBlockId(endBlockId);
485
486 reply(interest, response);
487
488 //300 means data fetching is in progress
489 response.setStatusCode(300);
490
491 segInit(processId, parameter);
492 }
493 else {
494 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
495 ProcessId processId = generateProcessId();
496 ProcessInfo& process = m_processes[processId];
Shuo Chen29c77fe2014-03-18 11:29:41 -0700497 RepoCommandResponse& response = process.response;
498 response.setStatusCode(100);
499 response.setProcessId(processId);
500 response.setInsertNum(0);
501 response.setStartBlockId(parameter.getStartBlockId());
502 reply(interest, response);
503
504 //300 means data fetching is in progress
505 response.setStatusCode(300);
506
507 segInit(processId, parameter);
508 }
509}
510
511void
512WriteHandle::extendNoEndTime(ProcessInfo& process)
513{
514 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
515 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
516 RepoCommandResponse& response = process.response;
517 if (now > noEndTime) {
518 response.setStatusCode(405);
519 return;
520 }
521 //extends noEndTime
522 process.noEndTime =
523 ndn::time::steady_clock::now() + m_noEndTimeout;
524
525}
526
527void
528WriteHandle::negativeReply(const Interest& interest, int statusCode)
529{
530 RepoCommandResponse response;
531 response.setStatusCode(statusCode);
532 reply(interest, response);
533}
534
535} //namespace repo