blob: eba46c5d30655b86f63f4c86a27f7eedfc6b6301 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Shuo Chen29c77fe2014-03-18 11:29:41 -070018 */
19
20#include "write-handle.hpp"
21
22namespace repo {
23
24static const int RETRY_TIMEOUT = 3;
25static const int DEFAULT_CREDIT = 12;
26static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
27static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
28
29WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
30 Scheduler& scheduler, CommandInterestValidator& validator)
31 : BaseHandle(face, storageHandle, keyChain, scheduler)
32 , m_validator(validator)
33 , m_retryTime(RETRY_TIMEOUT)
34 , m_credit(DEFAULT_CREDIT)
35 , m_noEndTimeout(NOEND_TIMEOUT)
36{
37}
38
39void
40WriteHandle::deleteProcess(ProcessId processId)
41{
42 m_processes.erase(processId);
43}
44
45// Interest.
46void
47WriteHandle::onInterest(const Name& prefix, const Interest& interest)
48{
49 m_validator.validate(interest,
50 bind(&WriteHandle::onValidated, this, _1, prefix),
51 bind(&WriteHandle::onValidationFailed, this, _1));
52}
53
54// onRegisterFailed.
55void
56WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
57{
58 throw Error("Insert prefix registration failed");
59}
60
61// onRegisterFailed for insert.
62void
63WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
64{
65 throw Error("Insert check prefix registration failed");
66}
67
68void
69WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
70{
71 //m_validResult = 1;
72 RepoCommandParameter parameter;
73 try {
74 extractParameter(*interest, prefix, parameter);
75 }
76 catch (RepoCommandParameter::Error) {
77 negativeReply(*interest, 403);
78 return;
79 }
80
81 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
82 if (parameter.hasSelectors()) {
83 negativeReply(*interest, 402);
84 return;
85 }
86 processSegmentedInsertCommand(*interest, parameter);
87 }
88 else {
89 processSingleInsertCommand(*interest, parameter);
90 }
91
92}
93
94void
95WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
96{
97 std::cout << "invalidated" << std::endl;
98 negativeReply(*interest, 401);
99}
100
101void
102WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
103{
104 //std::cout << "onData" << std::endl;
105 //std::cout << "I: " << interest.toUri() << std::endl;
106 //std::cout << "D: " << data.getName().toUri() << std::endl;
107 if (m_processes.count(processId) == 0) {
108 return;
109 }
110
111 ProcessInfo& process = m_processes[processId];
112 RepoCommandResponse& response = process.response;
113
114 if (response.getInsertNum() == 0) {
115 getStorageHandle().insertData(data);
116 response.setInsertNum(1);
117 }
118
119 deferredDeleteProcess(processId);
120}
121
122void
123WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
124{
125 //std::cout << "I: " << interest.toUri() << std::endl;
126 //std::cout << "D: " << data.getName().toUri() << std::endl;
127 //retrieve the process from the responsemap
128
129 if (m_processes.count(processId) == 0) {
130 return;
131 }
132 RepoCommandResponse& response = m_processes[processId].response;
133
134 //refresh endBlockId
135 Name::Component finalBlockId = data.getFinalBlockId();
136
137 if (!finalBlockId.empty()) {
138 SegmentNo final = finalBlockId.toSegment();
139 if (response.hasEndBlockId()) {
140 if (final < response.getEndBlockId()) {
141 response.setEndBlockId(final);
142 }
143 }
144 else {
145 response.setEndBlockId(final);
146 }
147 }
148
149 //insert data
150 //std::cout << "start to insert" << std::endl;
151 if (getStorageHandle().insertData(data)) {
152 response.setInsertNum(response.getInsertNum() + 1);
153 }
154 //std::cout << "end of insert" << std::endl;
155
156 //it->second = response;
157
158 onSegmentDataControl(processId, interest);
159}
160
161void
162WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
163{
164 std::cout << "Timeout" << std::endl;
165 m_processes.erase(processId);
166}
167
168void
169WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
170{
171 std::cout << "SegTimeout" << std::endl;
172
173 onSegmentTimeoutControl(processId, interest);
174}
175
176void
177WriteHandle::listen(const Name& prefix)
178{
179 Name insertPrefix;
180 insertPrefix.append(prefix).append("insert");
181 getFace().setInterestFilter(insertPrefix,
182 bind(&WriteHandle::onInterest, this, _1, _2),
183 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
184 Name insertCheckPrefix;
185 insertCheckPrefix.append(prefix).append("insert check");
186 getFace().setInterestFilter(insertCheckPrefix,
187 bind(&WriteHandle::onCheckInterest, this, _1, _2),
188 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
189}
190
191void
192WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
193{
194 ProcessInfo& process = m_processes[processId];
195 process.credit = 0;
196
197 map<SegmentNo, int>& processRetry = process.retryCounts;
198
199 Name name = parameter.getName();
200 SegmentNo startBlockId = parameter.getStartBlockId();
201
202 uint64_t initialCredit = m_credit;
203
204 if (parameter.hasEndBlockId()) {
205 initialCredit =
206 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId());
207 }
208 else {
209 // set noEndTimeout timer
210 process.noEndTime = ndn::time::steady_clock::now() +
211 m_noEndTimeout;
212 }
213 process.credit = initialCredit;
214 SegmentNo segment = startBlockId;
215 for (; segment < startBlockId + initialCredit; ++segment) {
216 Name fetchName = name;
217 fetchName.appendSegment(segment);
218 Interest interest(fetchName);
219 //std::cout << "seg:" << j<<std::endl;
220 getFace().expressInterest(interest,
221 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
222 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
223 process.credit--;
224 processRetry[segment] = 0;
225 }
226
227 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
228
229 segment++;
230 process.nextSegment = segment;
231 nextSegmentQueue.push(segment);
232}
233
234void
235WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
236{
237 //std::cout << "onSegmentDataControl: " << processId << std::endl;
238
239 if (m_processes.count(processId) == 0) {
240 return;
241 }
242 ProcessInfo& process = m_processes[processId];
243 RepoCommandResponse& response = process.response;
244 int& processCredit = process.credit;
245 //onSegmentDataControl is called when a data returns.
246 //When data returns, processCredit++
247 processCredit++;
248 SegmentNo& nextSegment = process.nextSegment;
249 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
250 map<SegmentNo, int>& retryCounts = process.retryCounts;
251
252 //read whether notime timeout
253 if (!response.hasEndBlockId()) {
254
255 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
256 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
257
258 if (now > noEndTime) {
259 std::cout << "noEndtimeout: " << processId << std::endl;
260 //m_processes.erase(processId);
261 //StatusCode should be refreshed as 405
262 response.setStatusCode(405);
263 //schedule a delete event
264 deferredDeleteProcess(processId);
265 return;
266 }
267 }
268
269 //read whether this process has total ends, if ends, remove control info from the maps
270 if (response.hasEndBlockId()) {
271 uint64_t nSegments =
272 response.getEndBlockId() - response.getStartBlockId() + 1;
273 if (response.getInsertNum() >= nSegments) {
274 //m_processes.erase(processId);
275 //All the data has been inserted, StatusCode is refreshed as 200
276 response.setStatusCode(200);
277 deferredDeleteProcess(processId);
278 return;
279 }
280 }
281
282 //check whether there is any credit
283 if (processCredit == 0)
284 return;
285
286
287 //check whether sent queue empty
288 if (nextSegmentQueue.empty()) {
289 //do not do anything
290 return;
291 }
292
293 //pop the queue
294 SegmentNo sendingSegment = nextSegmentQueue.front();
295 nextSegmentQueue.pop();
296
297 //check whether sendingSegment exceeds
298 if (sendingSegment > response.getEndBlockId()) {
299 //do not do anything
300 return;
301 }
302
303 //read whether this is retransmitted data;
304 SegmentNo fetchedSegment =
305 interest.getName().get(interest.getName().size() - 1).toSegment();
306
307 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
308
309 //find this fetched data, remove it from this map
310 //rit->second.erase(oit);
311 retryCounts.erase(fetchedSegment);
312 //express the interest of the top of the queue
313 Name fetchName(interest.getName().getPrefix(-1));
314 fetchName.appendSegment(sendingSegment);
315 Interest fetchInterest(fetchName);
316 getFace().expressInterest(fetchInterest,
317 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
318 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
319 //When an interest is expressed, processCredit--
320 processCredit--;
321 //std::cout << "sent seg: " << sendingSegment << std::endl;
322 if (retryCounts.count(sendingSegment) == 0) {
323 //not found
324 retryCounts[sendingSegment] = 0;
325 }
326 else {
327 //found
328 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
329 }
330 //increase the next seg and put it into the queue
331 if ((nextSegment + 1) <= response.getEndBlockId()) {
332 nextSegment++;
333 nextSegmentQueue.push(nextSegment);
334 }
335}
336
337void
338WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
339{
340 if (m_processes.count(processId) == 0) {
341 return;
342 }
343 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700344 // RepoCommandResponse& response = process.response;
345 // SegmentNo& nextSegment = process.nextSegment;
346 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700347 map<SegmentNo, int>& retryCounts = process.retryCounts;
348
349 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
350
351 std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
352
353 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
354
355 //read the retry time. If retry out of time, fail the process. if not, plus
356 int& retryTime = retryCounts[timeoutSegment];
357 if (retryTime >= m_retryTime) {
358 //fail this process
359 std::cout << "Retry timeout: " << processId << std::endl;
360 m_processes.erase(processId);
361 return;
362 }
363 else {
364 //Reput it in the queue, retryTime++
365 retryTime++;
366 Interest retryInterest(interest.getName());
367 getFace().expressInterest(retryInterest,
368 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
369 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
370 }
371
372}
373
374void
375WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
376{
377 m_validator.validate(interest,
378 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
379 bind(&WriteHandle::onCheckValidationFailed, this, _1));
380
381}
382
383void
384WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
385{
386 RepoCommandParameter parameter;
387 try {
388 extractParameter(*interest, prefix, parameter);
389 }
390 catch (RepoCommandParameter::Error) {
391 negativeReply(*interest, 403);
392 return;
393 }
394
395 if (!parameter.hasProcessId()) {
396 negativeReply(*interest, 403);
397 return;
398 }
399 //check whether this process exists
400 ProcessId processId = parameter.getProcessId();
401 if (m_processes.count(processId) == 0) {
402 std::cout << "no such processId: " << processId << std::endl;
403 negativeReply(*interest, 404);
404 return;
405 }
406
407 ProcessInfo& process = m_processes[processId];
408
409 RepoCommandResponse& response = process.response;
410
411 //Check whether it is single data fetching
412 if (!response.hasStartBlockId() &&
413 !response.hasEndBlockId()) {
414 reply(*interest, response);
415 return;
416 }
417
418 //read if noEndtimeout
419 if (!response.hasEndBlockId()) {
420 extendNoEndTime(process);
421 reply(*interest, response);
422 return;
423 }
424 else {
425 reply(*interest, response);
426 }
427}
428
429void
430WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
431{
432 negativeReply(*interest, 401);
433}
434
435void
436WriteHandle::deferredDeleteProcess(ProcessId processId)
437{
438 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
439 ndn::bind(&WriteHandle::deleteProcess, this, processId));
440}
441
442void
443WriteHandle::processSingleInsertCommand(const Interest& interest,
444 RepoCommandParameter& parameter)
445{
446 ProcessId processId = generateProcessId();
447
448 ProcessInfo& process = m_processes[processId];
449
450 RepoCommandResponse& response = process.response;
451 response.setStatusCode(100);
452 response.setProcessId(processId);
453 response.setInsertNum(0);
454
455 reply(interest, response);
456
457 response.setStatusCode(300);
458
459 Interest fetchInterest(parameter.getName());
460 if (parameter.hasSelectors()) {
461 fetchInterest.setSelectors(parameter.getSelectors());
462 }
463 getFace().expressInterest(fetchInterest,
464 bind(&WriteHandle::onData, this, _1, _2, processId),
465 bind(&WriteHandle::onTimeout, this, _1, processId));
466}
467
468void
469WriteHandle::processSegmentedInsertCommand(const Interest& interest,
470 RepoCommandParameter& parameter)
471{
472 if (parameter.hasEndBlockId()) {
473 //normal fetch segment
474 if (!parameter.hasStartBlockId()) {
475 parameter.setStartBlockId(0);
476 }
477
478 SegmentNo startBlockId = parameter.getStartBlockId();
479 SegmentNo endBlockId = parameter.getEndBlockId();
480 //std::cout << "startBlockId: " << startBlockId << std::endl;
481 //std::cout << "endBlockId: " << endBlockId << std::endl;
482 if (startBlockId > endBlockId) {
483 negativeReply(interest, 403);
484 return;
485 }
486
487 ProcessId processId = generateProcessId();
488 ProcessInfo& process = m_processes[processId];
489 //std::cout << "processId: " << processId << std::endl;
490 RepoCommandResponse& response = process.response;
491 response.setStatusCode(100);
492 response.setProcessId(processId);
493 response.setInsertNum(0);
494 response.setStartBlockId(startBlockId);
495 response.setEndBlockId(endBlockId);
496
497 reply(interest, response);
498
499 //300 means data fetching is in progress
500 response.setStatusCode(300);
501
502 segInit(processId, parameter);
503 }
504 else {
505 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
506 ProcessId processId = generateProcessId();
507 ProcessInfo& process = m_processes[processId];
508 //std::cout << "processId: " << processId << std::endl;
509 RepoCommandResponse& response = process.response;
510 response.setStatusCode(100);
511 response.setProcessId(processId);
512 response.setInsertNum(0);
513 response.setStartBlockId(parameter.getStartBlockId());
514 reply(interest, response);
515
516 //300 means data fetching is in progress
517 response.setStatusCode(300);
518
519 segInit(processId, parameter);
520 }
521}
522
523void
524WriteHandle::extendNoEndTime(ProcessInfo& process)
525{
526 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
527 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
528 RepoCommandResponse& response = process.response;
529 if (now > noEndTime) {
530 response.setStatusCode(405);
531 return;
532 }
533 //extends noEndTime
534 process.noEndTime =
535 ndn::time::steady_clock::now() + m_noEndTimeout;
536
537}
538
539void
540WriteHandle::negativeReply(const Interest& interest, int statusCode)
541{
542 RepoCommandResponse response;
543 response.setStatusCode(statusCode);
544 reply(interest, response);
545}
546
547} //namespace repo