blob: 68fc2a67b028a668636d2c3cf94c8ce9cca8b753 [file] [log] [blame]
Shuo Chen29c77fe2014-03-18 11:29:41 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#include "write-handle.hpp"
8
9namespace repo {
10
11static const int RETRY_TIMEOUT = 3;
12static const int DEFAULT_CREDIT = 12;
13static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
14static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
15
16WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
17 Scheduler& scheduler, CommandInterestValidator& validator)
18 : BaseHandle(face, storageHandle, keyChain, scheduler)
19 , m_validator(validator)
20 , m_retryTime(RETRY_TIMEOUT)
21 , m_credit(DEFAULT_CREDIT)
22 , m_noEndTimeout(NOEND_TIMEOUT)
23{
24}
25
26void
27WriteHandle::deleteProcess(ProcessId processId)
28{
29 m_processes.erase(processId);
30}
31
32// Interest.
33void
34WriteHandle::onInterest(const Name& prefix, const Interest& interest)
35{
36 m_validator.validate(interest,
37 bind(&WriteHandle::onValidated, this, _1, prefix),
38 bind(&WriteHandle::onValidationFailed, this, _1));
39}
40
41// onRegisterFailed.
42void
43WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
44{
45 throw Error("Insert prefix registration failed");
46}
47
48// onRegisterFailed for insert.
49void
50WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
51{
52 throw Error("Insert check prefix registration failed");
53}
54
55void
56WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
57{
58 //m_validResult = 1;
59 RepoCommandParameter parameter;
60 try {
61 extractParameter(*interest, prefix, parameter);
62 }
63 catch (RepoCommandParameter::Error) {
64 negativeReply(*interest, 403);
65 return;
66 }
67
68 if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
69 if (parameter.hasSelectors()) {
70 negativeReply(*interest, 402);
71 return;
72 }
73 processSegmentedInsertCommand(*interest, parameter);
74 }
75 else {
76 processSingleInsertCommand(*interest, parameter);
77 }
78
79}
80
81void
82WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
83{
84 std::cout << "invalidated" << std::endl;
85 negativeReply(*interest, 401);
86}
87
88void
89WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
90{
91 //std::cout << "onData" << std::endl;
92 //std::cout << "I: " << interest.toUri() << std::endl;
93 //std::cout << "D: " << data.getName().toUri() << std::endl;
94 if (m_processes.count(processId) == 0) {
95 return;
96 }
97
98 ProcessInfo& process = m_processes[processId];
99 RepoCommandResponse& response = process.response;
100
101 if (response.getInsertNum() == 0) {
102 getStorageHandle().insertData(data);
103 response.setInsertNum(1);
104 }
105
106 deferredDeleteProcess(processId);
107}
108
109void
110WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
111{
112 //std::cout << "I: " << interest.toUri() << std::endl;
113 //std::cout << "D: " << data.getName().toUri() << std::endl;
114 //retrieve the process from the responsemap
115
116 if (m_processes.count(processId) == 0) {
117 return;
118 }
119 RepoCommandResponse& response = m_processes[processId].response;
120
121 //refresh endBlockId
122 Name::Component finalBlockId = data.getFinalBlockId();
123
124 if (!finalBlockId.empty()) {
125 SegmentNo final = finalBlockId.toSegment();
126 if (response.hasEndBlockId()) {
127 if (final < response.getEndBlockId()) {
128 response.setEndBlockId(final);
129 }
130 }
131 else {
132 response.setEndBlockId(final);
133 }
134 }
135
136 //insert data
137 //std::cout << "start to insert" << std::endl;
138 if (getStorageHandle().insertData(data)) {
139 response.setInsertNum(response.getInsertNum() + 1);
140 }
141 //std::cout << "end of insert" << std::endl;
142
143 //it->second = response;
144
145 onSegmentDataControl(processId, interest);
146}
147
148void
149WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
150{
151 std::cout << "Timeout" << std::endl;
152 m_processes.erase(processId);
153}
154
155void
156WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
157{
158 std::cout << "SegTimeout" << std::endl;
159
160 onSegmentTimeoutControl(processId, interest);
161}
162
163void
164WriteHandle::listen(const Name& prefix)
165{
166 Name insertPrefix;
167 insertPrefix.append(prefix).append("insert");
168 getFace().setInterestFilter(insertPrefix,
169 bind(&WriteHandle::onInterest, this, _1, _2),
170 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
171 Name insertCheckPrefix;
172 insertCheckPrefix.append(prefix).append("insert check");
173 getFace().setInterestFilter(insertCheckPrefix,
174 bind(&WriteHandle::onCheckInterest, this, _1, _2),
175 bind(&WriteHandle::onRegisterFailed, this, _1, _2));
176}
177
178void
179WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
180{
181 ProcessInfo& process = m_processes[processId];
182 process.credit = 0;
183
184 map<SegmentNo, int>& processRetry = process.retryCounts;
185
186 Name name = parameter.getName();
187 SegmentNo startBlockId = parameter.getStartBlockId();
188
189 uint64_t initialCredit = m_credit;
190
191 if (parameter.hasEndBlockId()) {
192 initialCredit =
193 std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId());
194 }
195 else {
196 // set noEndTimeout timer
197 process.noEndTime = ndn::time::steady_clock::now() +
198 m_noEndTimeout;
199 }
200 process.credit = initialCredit;
201 SegmentNo segment = startBlockId;
202 for (; segment < startBlockId + initialCredit; ++segment) {
203 Name fetchName = name;
204 fetchName.appendSegment(segment);
205 Interest interest(fetchName);
206 //std::cout << "seg:" << j<<std::endl;
207 getFace().expressInterest(interest,
208 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
209 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
210 process.credit--;
211 processRetry[segment] = 0;
212 }
213
214 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
215
216 segment++;
217 process.nextSegment = segment;
218 nextSegmentQueue.push(segment);
219}
220
221void
222WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
223{
224 //std::cout << "onSegmentDataControl: " << processId << std::endl;
225
226 if (m_processes.count(processId) == 0) {
227 return;
228 }
229 ProcessInfo& process = m_processes[processId];
230 RepoCommandResponse& response = process.response;
231 int& processCredit = process.credit;
232 //onSegmentDataControl is called when a data returns.
233 //When data returns, processCredit++
234 processCredit++;
235 SegmentNo& nextSegment = process.nextSegment;
236 queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
237 map<SegmentNo, int>& retryCounts = process.retryCounts;
238
239 //read whether notime timeout
240 if (!response.hasEndBlockId()) {
241
242 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
243 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
244
245 if (now > noEndTime) {
246 std::cout << "noEndtimeout: " << processId << std::endl;
247 //m_processes.erase(processId);
248 //StatusCode should be refreshed as 405
249 response.setStatusCode(405);
250 //schedule a delete event
251 deferredDeleteProcess(processId);
252 return;
253 }
254 }
255
256 //read whether this process has total ends, if ends, remove control info from the maps
257 if (response.hasEndBlockId()) {
258 uint64_t nSegments =
259 response.getEndBlockId() - response.getStartBlockId() + 1;
260 if (response.getInsertNum() >= nSegments) {
261 //m_processes.erase(processId);
262 //All the data has been inserted, StatusCode is refreshed as 200
263 response.setStatusCode(200);
264 deferredDeleteProcess(processId);
265 return;
266 }
267 }
268
269 //check whether there is any credit
270 if (processCredit == 0)
271 return;
272
273
274 //check whether sent queue empty
275 if (nextSegmentQueue.empty()) {
276 //do not do anything
277 return;
278 }
279
280 //pop the queue
281 SegmentNo sendingSegment = nextSegmentQueue.front();
282 nextSegmentQueue.pop();
283
284 //check whether sendingSegment exceeds
285 if (sendingSegment > response.getEndBlockId()) {
286 //do not do anything
287 return;
288 }
289
290 //read whether this is retransmitted data;
291 SegmentNo fetchedSegment =
292 interest.getName().get(interest.getName().size() - 1).toSegment();
293
294 BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
295
296 //find this fetched data, remove it from this map
297 //rit->second.erase(oit);
298 retryCounts.erase(fetchedSegment);
299 //express the interest of the top of the queue
300 Name fetchName(interest.getName().getPrefix(-1));
301 fetchName.appendSegment(sendingSegment);
302 Interest fetchInterest(fetchName);
303 getFace().expressInterest(fetchInterest,
304 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
305 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
306 //When an interest is expressed, processCredit--
307 processCredit--;
308 //std::cout << "sent seg: " << sendingSegment << std::endl;
309 if (retryCounts.count(sendingSegment) == 0) {
310 //not found
311 retryCounts[sendingSegment] = 0;
312 }
313 else {
314 //found
315 retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
316 }
317 //increase the next seg and put it into the queue
318 if ((nextSegment + 1) <= response.getEndBlockId()) {
319 nextSegment++;
320 nextSegmentQueue.push(nextSegment);
321 }
322}
323
324void
325WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
326{
327 if (m_processes.count(processId) == 0) {
328 return;
329 }
330 ProcessInfo& process = m_processes[processId];
Shuo Chen09f09bb2014-03-18 15:37:11 -0700331 // RepoCommandResponse& response = process.response;
332 // SegmentNo& nextSegment = process.nextSegment;
333 // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
Shuo Chen29c77fe2014-03-18 11:29:41 -0700334 map<SegmentNo, int>& retryCounts = process.retryCounts;
335
336 SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
337
338 std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
339
340 BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
341
342 //read the retry time. If retry out of time, fail the process. if not, plus
343 int& retryTime = retryCounts[timeoutSegment];
344 if (retryTime >= m_retryTime) {
345 //fail this process
346 std::cout << "Retry timeout: " << processId << std::endl;
347 m_processes.erase(processId);
348 return;
349 }
350 else {
351 //Reput it in the queue, retryTime++
352 retryTime++;
353 Interest retryInterest(interest.getName());
354 getFace().expressInterest(retryInterest,
355 bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
356 bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
357 }
358
359}
360
361void
362WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
363{
364 m_validator.validate(interest,
365 bind(&WriteHandle::onCheckValidated, this, _1, prefix),
366 bind(&WriteHandle::onCheckValidationFailed, this, _1));
367
368}
369
370void
371WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
372{
373 RepoCommandParameter parameter;
374 try {
375 extractParameter(*interest, prefix, parameter);
376 }
377 catch (RepoCommandParameter::Error) {
378 negativeReply(*interest, 403);
379 return;
380 }
381
382 if (!parameter.hasProcessId()) {
383 negativeReply(*interest, 403);
384 return;
385 }
386 //check whether this process exists
387 ProcessId processId = parameter.getProcessId();
388 if (m_processes.count(processId) == 0) {
389 std::cout << "no such processId: " << processId << std::endl;
390 negativeReply(*interest, 404);
391 return;
392 }
393
394 ProcessInfo& process = m_processes[processId];
395
396 RepoCommandResponse& response = process.response;
397
398 //Check whether it is single data fetching
399 if (!response.hasStartBlockId() &&
400 !response.hasEndBlockId()) {
401 reply(*interest, response);
402 return;
403 }
404
405 //read if noEndtimeout
406 if (!response.hasEndBlockId()) {
407 extendNoEndTime(process);
408 reply(*interest, response);
409 return;
410 }
411 else {
412 reply(*interest, response);
413 }
414}
415
416void
417WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
418{
419 negativeReply(*interest, 401);
420}
421
422void
423WriteHandle::deferredDeleteProcess(ProcessId processId)
424{
425 getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
426 ndn::bind(&WriteHandle::deleteProcess, this, processId));
427}
428
429void
430WriteHandle::processSingleInsertCommand(const Interest& interest,
431 RepoCommandParameter& parameter)
432{
433 ProcessId processId = generateProcessId();
434
435 ProcessInfo& process = m_processes[processId];
436
437 RepoCommandResponse& response = process.response;
438 response.setStatusCode(100);
439 response.setProcessId(processId);
440 response.setInsertNum(0);
441
442 reply(interest, response);
443
444 response.setStatusCode(300);
445
446 Interest fetchInterest(parameter.getName());
447 if (parameter.hasSelectors()) {
448 fetchInterest.setSelectors(parameter.getSelectors());
449 }
450 getFace().expressInterest(fetchInterest,
451 bind(&WriteHandle::onData, this, _1, _2, processId),
452 bind(&WriteHandle::onTimeout, this, _1, processId));
453}
454
455void
456WriteHandle::processSegmentedInsertCommand(const Interest& interest,
457 RepoCommandParameter& parameter)
458{
459 if (parameter.hasEndBlockId()) {
460 //normal fetch segment
461 if (!parameter.hasStartBlockId()) {
462 parameter.setStartBlockId(0);
463 }
464
465 SegmentNo startBlockId = parameter.getStartBlockId();
466 SegmentNo endBlockId = parameter.getEndBlockId();
467 //std::cout << "startBlockId: " << startBlockId << std::endl;
468 //std::cout << "endBlockId: " << endBlockId << std::endl;
469 if (startBlockId > endBlockId) {
470 negativeReply(interest, 403);
471 return;
472 }
473
474 ProcessId processId = generateProcessId();
475 ProcessInfo& process = m_processes[processId];
476 //std::cout << "processId: " << processId << std::endl;
477 RepoCommandResponse& response = process.response;
478 response.setStatusCode(100);
479 response.setProcessId(processId);
480 response.setInsertNum(0);
481 response.setStartBlockId(startBlockId);
482 response.setEndBlockId(endBlockId);
483
484 reply(interest, response);
485
486 //300 means data fetching is in progress
487 response.setStatusCode(300);
488
489 segInit(processId, parameter);
490 }
491 else {
492 //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
493 ProcessId processId = generateProcessId();
494 ProcessInfo& process = m_processes[processId];
495 //std::cout << "processId: " << processId << std::endl;
496 RepoCommandResponse& response = process.response;
497 response.setStatusCode(100);
498 response.setProcessId(processId);
499 response.setInsertNum(0);
500 response.setStartBlockId(parameter.getStartBlockId());
501 reply(interest, response);
502
503 //300 means data fetching is in progress
504 response.setStatusCode(300);
505
506 segInit(processId, parameter);
507 }
508}
509
510void
511WriteHandle::extendNoEndTime(ProcessInfo& process)
512{
513 ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
514 ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
515 RepoCommandResponse& response = process.response;
516 if (now > noEndTime) {
517 response.setStatusCode(405);
518 return;
519 }
520 //extends noEndTime
521 process.noEndTime =
522 ndn::time::steady_clock::now() + m_noEndTimeout;
523
524}
525
526void
527WriteHandle::negativeReply(const Interest& interest, int statusCode)
528{
529 RepoCommandResponse response;
530 response.setStatusCode(statusCode);
531 reply(interest, response);
532}
533
534} //namespace repo