blob: c67e775ea89471e193611a96c1dbc19ad5803c55 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
29WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
30 Scheduler& scheduler, CommandInterestValidator& validator)
31 : BaseHandle(face, storageHandle, keyChain, scheduler)
32 , m_validator(validator)
33 , m_retryTime(RETRY_TIMEOUT)
34 , m_credit(DEFAULT_CREDIT)
35 , m_noEndTimeout(NOEND_TIMEOUT)
36{
37}
38
39void
40WriteHandle::deleteProcess(ProcessId processId)
41{
42 m_processes.erase(processId);
43}
44
45// Interest.
46void
47WriteHandle::onInterest(const Name& prefix, const Interest& interest)
48{
49 m_validator.validate(interest,
50 bind(&WriteHandle::onValidated, this, _1, prefix),
51 bind(&WriteHandle::onValidationFailed, this, _1));
52}
53
54// onRegisterFailed.
55void
56WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
57{
58 throw Error("Insert prefix registration failed");
59}
60
61// onRegisterFailed for insert.
62void
63WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
64{
65 throw Error("Insert check prefix registration failed");
66}
67
68void
69WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
70{
71 //m_validResult = 1;
72 RepoCommandParameter parameter;
73 try {
74 extractParameter(*interest, prefix, parameter);
75 }
76 catch (RepoCommandParameter::Error) {
77 negativeReply(*interest, 403);
78 return;
79 }
80
81 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
82 if (parameter.hasSelectors()) {
83 negativeReply(*interest, 402);
84 return;
85 }
86 processSegmentedInsertCommand(*interest, parameter);
87 }
88 else {
89 processSingleInsertCommand(*interest, parameter);
90 }
91
92}
93
94void
95WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
96{
97 std::cout << "invalidated" << std::endl;
98 negativeReply(*interest, 401);
99}
100
101void
102WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
103{
104 //std::cout << "onData" << std::endl;
105 //std::cout << "I: " << interest.toUri() << std::endl;
106 //std::cout << "D: " << data.getName().toUri() << std::endl;
107 if (m_processes.count(processId) == 0) {
108 return;
109 }
110
111 ProcessInfo& process = m_processes[processId];
112 RepoCommandResponse& response = process.response;
113
114 if (response.getInsertNum() == 0) {
115 getStorageHandle().insertData(data);
116 response.setInsertNum(1);
117 }
118
119 deferredDeleteProcess(processId);
120}
121
122void
123WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
124{
125 //std::cout << "I: " << interest.toUri() << std::endl;
126 //std::cout << "D: " << data.getName().toUri() << std::endl;
127 //retrieve the process from the responsemap
128
129 if (m_processes.count(processId) == 0) {
130 return;
131 }
132 RepoCommandResponse& response = m_processes[processId].response;
133
134 //refresh endBlockId
135 Name::Component finalBlockId = data.getFinalBlockId();
136
137 if (!finalBlockId.empty()) {
138 SegmentNo final = finalBlockId.toSegment();
139 if (response.hasEndBlockId()) {
140 if (final < response.getEndBlockId()) {
141 response.setEndBlockId(final);
142 }
143 }
144 else {
145 response.setEndBlockId(final);
146 }
147 }
148
149 //insert data
150 //std::cout << "start to insert" << std::endl;
151 if (getStorageHandle().insertData(data)) {
152 response.setInsertNum(response.getInsertNum() + 1);
153 }
154 //std::cout << "end of insert" << std::endl;
155
156 //it->second = response;
157
158 onSegmentDataControl(processId, interest);
159}
160
161void
162WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
163{
164 std::cout << "Timeout" << std::endl;
165 m_processes.erase(processId);
166}
167
168void
169WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
170{
171 std::cout << "SegTimeout" << std::endl;
172
173 onSegmentTimeoutControl(processId, interest);
174}
175
176void
177WriteHandle::listen(const Name& prefix)
178{
179 Name insertPrefix;
180 insertPrefix.append(prefix).append("insert");
181 getFace().setInterestFilter(insertPrefix,
182 bind(&WriteHandle::onInterest, this, _1, _2),
183 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
184 Name insertCheckPrefix;
185 insertCheckPrefix.append(prefix).append("insert check");
186 getFace().setInterestFilter(insertCheckPrefix,
187 bind(&WriteHandle::onCheckInterest, this, _1, _2),
188 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
189}
190
191void
192WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
193{
194 ProcessInfo& process = m_processes[processId];
195 process.credit = 0;
196
197 map<SegmentNo, int>& processRetry = process.retryCounts;
198
199 Name name = parameter.getName();
200 SegmentNo startBlockId = parameter.getStartBlockId();
201
202 uint64_t initialCredit = m_credit;
203
204 if (parameter.hasEndBlockId()) {
205 initialCredit =
Shuo Chen39fe8da2014-04-30 00:00:35 +0800206 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
Shuo Chen29c77fe2014-03-18 11:29:41 -0700207 }
208 else {
209 // set noEndTimeout timer
210 process.noEndTime = ndn::time::steady_clock::now() +
211 m_noEndTimeout;
212 }
213 process.credit = initialCredit;
214 SegmentNo segment = startBlockId;
215 for (; segment < startBlockId + initialCredit; ++segment) {
216 Name fetchName = name;
217 fetchName.appendSegment(segment);
218 Interest interest(fetchName);
219 //std::cout << "seg:" << j<<std::endl;
220 getFace().expressInterest(interest,
221 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
222 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
223 process.credit--;
224 processRetry[segment] = 0;
225 }
226
227 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
228
Shuo Chen29c77fe2014-03-18 11:29:41 -0700229 process.nextSegment = segment;
230 nextSegmentQueue.push(segment);
231}
232
233void
234WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
235{
236 //std::cout << "onSegmentDataControl: " << processId << std::endl;
237
238 if (m_processes.count(processId) == 0) {
239 return;
240 }
241 ProcessInfo& process = m_processes[processId];
242 RepoCommandResponse& response = process.response;
243 int& processCredit = process.credit;
244 //onSegmentDataControl is called when a data returns.
245 //When data returns, processCredit++
246 processCredit++;
247 SegmentNo& nextSegment = process.nextSegment;
248 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
249 map<SegmentNo, int>& retryCounts = process.retryCounts;
250
251 //read whether notime timeout
252 if (!response.hasEndBlockId()) {
253
254 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
255 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
256
257 if (now > noEndTime) {
258 std::cout << "noEndtimeout: " << processId << std::endl;
259 //m_processes.erase(processId);
260 //StatusCode should be refreshed as 405
261 response.setStatusCode(405);
262 //schedule a delete event
263 deferredDeleteProcess(processId);
264 return;
265 }
266 }
267
268 //read whether this process has total ends, if ends, remove control info from the maps
269 if (response.hasEndBlockId()) {
270 uint64_t nSegments =
271 response.getEndBlockId() - response.getStartBlockId() + 1;
272 if (response.getInsertNum() >= nSegments) {
273 //m_processes.erase(processId);
274 //All the data has been inserted, StatusCode is refreshed as 200
275 response.setStatusCode(200);
276 deferredDeleteProcess(processId);
277 return;
278 }
279 }
280
281 //check whether there is any credit
282 if (processCredit == 0)
283 return;
284
285
286 //check whether sent queue empty
287 if (nextSegmentQueue.empty()) {
288 //do not do anything
289 return;
290 }
291
292 //pop the queue
293 SegmentNo sendingSegment = nextSegmentQueue.front();
294 nextSegmentQueue.pop();
295
296 //check whether sendingSegment exceeds
297 if (sendingSegment > response.getEndBlockId()) {
298 //do not do anything
299 return;
300 }
301
302 //read whether this is retransmitted data;
303 SegmentNo fetchedSegment =
304 interest.getName().get(interest.getName().size() - 1).toSegment();
305
306 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
307
308 //find this fetched data, remove it from this map
309 //rit->second.erase(oit);
310 retryCounts.erase(fetchedSegment);
311 //express the interest of the top of the queue
312 Name fetchName(interest.getName().getPrefix(-1));
313 fetchName.appendSegment(sendingSegment);
314 Interest fetchInterest(fetchName);
315 getFace().expressInterest(fetchInterest,
316 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
317 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
318 //When an interest is expressed, processCredit--
319 processCredit--;
320 //std::cout << "sent seg: " << sendingSegment << std::endl;
321 if (retryCounts.count(sendingSegment) == 0) {
322 //not found
323 retryCounts[sendingSegment] = 0;
324 }
325 else {
326 //found
327 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
328 }
329 //increase the next seg and put it into the queue
330 if ((nextSegment + 1) <= response.getEndBlockId()) {
331 nextSegment++;
332 nextSegmentQueue.push(nextSegment);
333 }
334}
335
336void
337WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
338{
339 if (m_processes.count(processId) == 0) {
340 return;
341 }
342 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700343 // RepoCommandResponse& response = process.response;
344 // SegmentNo& nextSegment = process.nextSegment;
345 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700346 map<SegmentNo, int>& retryCounts = process.retryCounts;
347
348 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
349
350 std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
351
352 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
353
354 //read the retry time. If retry out of time, fail the process. if not, plus
355 int& retryTime = retryCounts[timeoutSegment];
356 if (retryTime >= m_retryTime) {
357 //fail this process
358 std::cout << "Retry timeout: " << processId << std::endl;
359 m_processes.erase(processId);
360 return;
361 }
362 else {
363 //Reput it in the queue, retryTime++
364 retryTime++;
365 Interest retryInterest(interest.getName());
366 getFace().expressInterest(retryInterest,
367 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
368 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
369 }
370
371}
372
373void
374WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
375{
376 m_validator.validate(interest,
377 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
378 bind(&WriteHandle::onCheckValidationFailed, this, _1));
379
380}
381
382void
383WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
384{
385 RepoCommandParameter parameter;
386 try {
387 extractParameter(*interest, prefix, parameter);
388 }
389 catch (RepoCommandParameter::Error) {
390 negativeReply(*interest, 403);
391 return;
392 }
393
394 if (!parameter.hasProcessId()) {
395 negativeReply(*interest, 403);
396 return;
397 }
398 //check whether this process exists
399 ProcessId processId = parameter.getProcessId();
400 if (m_processes.count(processId) == 0) {
401 std::cout << "no such processId: " << processId << std::endl;
402 negativeReply(*interest, 404);
403 return;
404 }
405
406 ProcessInfo& process = m_processes[processId];
407
408 RepoCommandResponse& response = process.response;
409
410 //Check whether it is single data fetching
411 if (!response.hasStartBlockId() &&
412 !response.hasEndBlockId()) {
413 reply(*interest, response);
414 return;
415 }
416
417 //read if noEndtimeout
418 if (!response.hasEndBlockId()) {
419 extendNoEndTime(process);
420 reply(*interest, response);
421 return;
422 }
423 else {
424 reply(*interest, response);
425 }
426}
427
428void
429WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
430{
431 negativeReply(*interest, 401);
432}
433
434void
435WriteHandle::deferredDeleteProcess(ProcessId processId)
436{
437 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
438 ndn::bind(&WriteHandle::deleteProcess, this, processId));
439}
440
441void
442WriteHandle::processSingleInsertCommand(const Interest& interest,
443 RepoCommandParameter& parameter)
444{
445 ProcessId processId = generateProcessId();
446
447 ProcessInfo& process = m_processes[processId];
448
449 RepoCommandResponse& response = process.response;
450 response.setStatusCode(100);
451 response.setProcessId(processId);
452 response.setInsertNum(0);
453
454 reply(interest, response);
455
456 response.setStatusCode(300);
457
458 Interest fetchInterest(parameter.getName());
459 if (parameter.hasSelectors()) {
460 fetchInterest.setSelectors(parameter.getSelectors());
461 }
462 getFace().expressInterest(fetchInterest,
463 bind(&WriteHandle::onData, this, _1, _2, processId),
464 bind(&WriteHandle::onTimeout, this, _1, processId));
465}
466
467void
468WriteHandle::processSegmentedInsertCommand(const Interest& interest,
469 RepoCommandParameter& parameter)
470{
471 if (parameter.hasEndBlockId()) {
472 //normal fetch segment
473 if (!parameter.hasStartBlockId()) {
474 parameter.setStartBlockId(0);
475 }
476
477 SegmentNo startBlockId = parameter.getStartBlockId();
478 SegmentNo endBlockId = parameter.getEndBlockId();
479 //std::cout << "startBlockId: " << startBlockId << std::endl;
480 //std::cout << "endBlockId: " << endBlockId << std::endl;
481 if (startBlockId > endBlockId) {
482 negativeReply(interest, 403);
483 return;
484 }
485
486 ProcessId processId = generateProcessId();
487 ProcessInfo& process = m_processes[processId];
488 //std::cout << "processId: " << processId << std::endl;
489 RepoCommandResponse& response = process.response;
490 response.setStatusCode(100);
491 response.setProcessId(processId);
492 response.setInsertNum(0);
493 response.setStartBlockId(startBlockId);
494 response.setEndBlockId(endBlockId);
495
496 reply(interest, response);
497
498 //300 means data fetching is in progress
499 response.setStatusCode(300);
500
501 segInit(processId, parameter);
502 }
503 else {
504 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
505 ProcessId processId = generateProcessId();
506 ProcessInfo& process = m_processes[processId];
507 //std::cout << "processId: " << processId << std::endl;
508 RepoCommandResponse& response = process.response;
509 response.setStatusCode(100);
510 response.setProcessId(processId);
511 response.setInsertNum(0);
512 response.setStartBlockId(parameter.getStartBlockId());
513 reply(interest, response);
514
515 //300 means data fetching is in progress
516 response.setStatusCode(300);
517
518 segInit(processId, parameter);
519 }
520}
521
522void
523WriteHandle::extendNoEndTime(ProcessInfo& process)
524{
525 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
526 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
527 RepoCommandResponse& response = process.response;
528 if (now > noEndTime) {
529 response.setStatusCode(405);
530 return;
531 }
532 //extends noEndTime
533 process.noEndTime =
534 ndn::time::steady_clock::now() + m_noEndTimeout;
535
536}
537
538void
539WriteHandle::negativeReply(const Interest& interest, int statusCode)
540{
541 RepoCommandResponse response;
542 response.setStatusCode(statusCode);
543 reply(interest, response);
544}
545
546} //namespace repo